Module src.analyze_helper
Expand source code
from matplotlib import pyplot as plt
import os
from os.path import join as oj
import numpy as np
import pandas as pd
import data
from sklearn.model_selection import KFold
from colorama import Fore
import pickle as pkl
import config
import viz
from config import *
def load_results(out_dir):
r = []
for fname in os.listdir(out_dir):
d = pkl.load(open(oj(out_dir, fname), 'rb'))
metrics = {k: d['cv'][k] for k in d['cv'].keys() if not 'curve' in k}
num_pts_by_fold_cv = d['num_pts_by_fold_cv']
out = {k: np.average(metrics[k], weights=num_pts_by_fold_cv) for k in metrics}
out.update({k + '_std': np.std(metrics[k]) for k in metrics})
out['model_type'] = fname.replace('.pkl', '') # d['model_type']
imp_mat = np.array(d['imps']['imps'])
imp_mu = imp_mat.mean(axis=0)
imp_sd = imp_mat.std(axis=0)
feat_names = d['feat_names_selected']
out.update({feat_names[i] + '_f': imp_mu[i] for i in range(len(feat_names))})
out.update({feat_names[i] + '_std_f': imp_sd[i] for i in range(len(feat_names))})
r.append(pd.Series(out))
r = pd.concat(r, axis=1, sort=False).T.infer_objects()
r = r.reindex(sorted(r.columns), axis=1) # sort the column names
r = r.round(3)
r = r.set_index('model_type')
return r
def get_data_over_folds(model_names: list, out_dir: str, cell_nums: pd.Series, X, y, outcome_def='y_consec_sig', dset='clath_aux+gak_a7d2'):
'''Returns predictions/labels over folds in the dataset
Params
------
cell_nums: pd.Series
equivalent to df.cell_num
Returns
-------
d_full_cv: pd.DataFrame
n rows, one for each data point in the training set (over all folds)
2 columns for each model, one for predictions, and one for predicted probabilities
idxs_cv: np.array
indexes corresponding locations of the validation set
for example, df.y_thresh.iloc[idxs_cv] would yield all the labels corresponding to the cv preds
'''
# split testing data based on cell num
d = {}
cell_nums_train = config.DSETS[dset]['train']
kf = KFold(n_splits=len(cell_nums_train))
idxs_cv = []
# get predictions over all folds and save into a dict
if not type(model_names) == 'list' and not 'ndarray' in str(type(model_names)):
model_names = [model_names]
for i, model_name in enumerate(model_names):
results_individual = pkl.load(open(f'{out_dir}/{model_name}.pkl', 'rb'))
fold_num = 0
for cv_idx, cv_val_idx in kf.split(cell_nums_train):
# get sample indices
idxs_val_cv = cell_nums.isin(cell_nums_train[np.array(cv_val_idx)])
X_val_cv, Y_val_cv = X[idxs_val_cv], y[idxs_val_cv]
# get predictions
preds, preds_proba = analyze_individual_results(results_individual, X_val_cv, Y_val_cv,
print_results=False, plot_results=False,
model_cv_fold=fold_num)
d[f'{model_name}_{fold_num}'] = preds
d[f'{model_name}_{fold_num}_proba'] = preds_proba
if i == 0:
idxs_cv.append(np.arange(X.shape[0])[idxs_val_cv])
fold_num += 1
# concatenate over folds
d2 = {}
for model_name in model_names:
d2[model_name] = np.hstack([d[k] for k in d.keys() if model_name in k and not 'proba' in k])
d2[model_name + '_proba'] = np.hstack([d[k] for k in d.keys() if model_name in k and 'proba' in k])
return pd.DataFrame.from_dict(d2), np.hstack(idxs_cv)
def analyze_individual_results(results, X_test, Y_test, print_results=False, plot_results=False, model_cv_fold=0):
scores_cv = results['cv']
imps = results['imps']
m = imps['model'][model_cv_fold]
preds = m.predict(X_test[results['feat_names_selected']])
try:
preds_proba = m.predict_proba(X_test[results['feat_names_selected']])[:, 1]
except:
preds_proba = preds
if print_results:
print(Fore.CYAN + f'{"metric":<25}\tvalidation') # \ttest')
for s in results['metrics']:
if not 'curve' in s:
print(Fore.WHITE + f'{s:<25}\t{np.mean(scores_cv[s]):.3f} ~ {np.std(scores_cv[s]):.3f}')
# print(Fore.WHITE + f'{s:<25}\t{np.mean(scores_cv[s]):.3f} ~ {np.std(scores_cv[s]):.3f}\t{np.mean(scores_test[s]):.3f} ~ {np.std(scores_test[s]):.3f}')
print(Fore.CYAN + '\nfeature importances')
imp_mat = np.array(imps['imps'])
imp_mu = imp_mat.mean(axis=0)
imp_sd = imp_mat.std(axis=0)
for i, feat_name in enumerate(results['feat_names_selected']):
print(Fore.WHITE + f'{feat_name:<25}\t{imp_mu[i]:.3f} ~ {imp_sd[i]:.3f}')
if plot_results:
# print(m.coef_)
plt.figure(figsize=(10, 3), dpi=140)
R, C = 1, 3
plt.subplot(R, C, 1)
# print(X_test.shape, results['feat_names'])
viz.plot_confusion_matrix(Y_test, preds, classes=np.array(['Failure', 'Success']))
plt.subplot(R, C, 2)
prec, rec, thresh = scores_test['precision_recall_curve'][0]
plt.plot(rec, prec)
plt.xlim((-0.1, 1.1))
plt.ylim((-0.1, 1.1))
plt.ylabel('Precision')
plt.xlabel('Recall')
plt.subplot(R, C, 3)
plt.hist(preds_proba[Y_test == 0], alpha=0.5, label='Failure')
plt.hist(preds_proba[Y_test == 1], alpha=0.5, label='Success')
plt.xlabel('Predicted probability')
plt.ylabel('Count')
plt.legend()
plt.tight_layout()
plt.show()
return preds, preds_proba
def load_results_many_models(out_dir, model_names, X_test, Y_test):
d = {}
for i, model_name in enumerate(model_names):
results_individual = pkl.load(open(oj(out_dir, f'{model_name}.pkl'), 'rb'))
preds, preds_proba = analyze_individual_results(results_individual, X_test, Y_test,
print_results=False, plot_results=False)
d[model_name] = preds
d[model_name + '_proba'] = preds_proba
d[model_name + '_errs'] = preds != Y_test
df_preds = pd.DataFrame.from_dict(d)
return df_preds
# normalize and store
def normalize(df, outcome_def):
X = df[data.get_feature_names(df)]
X_mean = X.mean()
X_std = X.std()
ks = list(X.keys())
norms = {ks[i]: {'mu': X_mean[i], 'std': X_std[i]}
for i in range(len(ks))}
X = (X - X_mean) / X_std
y = df[outcome_def].values
return X, y, norms
def normalize_and_predict(m0, feat_names_selected, dset_name, normalize_by_train,
exclude_easy_tracks=False, outcome_def='y_consec_thresh'):
df_new = data.get_data(dset=dset_name, use_processed=True,
use_processed_dicts=True, outcome_def=outcome_def,
previous_meta_file=oj(DIR_PROCESSED,
'metadata_clath_aux+gak_a7d2.pkl'))
if exclude_easy_tracks:
df_new = df_new[df_new['valid']] # exclude test cells, short/long tracks, hotspots
# impute (only does anything for dynamin data)
df_new = df_new.fillna(df_new.median())
X_new = df_new[data.get_feature_names(df_new)]
if normalize_by_train:
X_new = (X_new - X_mean_train) / X_std_train
else:
X_new = (X_new - X_new.mean()) / X_new.std()
y_new = df_new[outcome_def].values
preds_new = m0.predict(X_new[feat_names_selected])
preds_proba_new = m0.predict_proba(X_new[feat_names_selected])[:, 1]
Y_maxes = df_new['Y_max']
return df_new, y_new, preds_new, preds_proba_new, Y_maxes
def calc_errs(preds, y_full_cv):
tp = np.logical_and(preds == 1, y_full_cv == 1)
tn = np.logical_and(preds == 0, y_full_cv == 0)
fp = preds > y_full_cv
fn = preds < y_full_cv
return tp, tn, fp, fn
Functions
def analyze_individual_results(results, X_test, Y_test, print_results=False, plot_results=False, model_cv_fold=0)
-
Expand source code
def analyze_individual_results(results, X_test, Y_test, print_results=False, plot_results=False, model_cv_fold=0): scores_cv = results['cv'] imps = results['imps'] m = imps['model'][model_cv_fold] preds = m.predict(X_test[results['feat_names_selected']]) try: preds_proba = m.predict_proba(X_test[results['feat_names_selected']])[:, 1] except: preds_proba = preds if print_results: print(Fore.CYAN + f'{"metric":<25}\tvalidation') # \ttest') for s in results['metrics']: if not 'curve' in s: print(Fore.WHITE + f'{s:<25}\t{np.mean(scores_cv[s]):.3f} ~ {np.std(scores_cv[s]):.3f}') # print(Fore.WHITE + f'{s:<25}\t{np.mean(scores_cv[s]):.3f} ~ {np.std(scores_cv[s]):.3f}\t{np.mean(scores_test[s]):.3f} ~ {np.std(scores_test[s]):.3f}') print(Fore.CYAN + '\nfeature importances') imp_mat = np.array(imps['imps']) imp_mu = imp_mat.mean(axis=0) imp_sd = imp_mat.std(axis=0) for i, feat_name in enumerate(results['feat_names_selected']): print(Fore.WHITE + f'{feat_name:<25}\t{imp_mu[i]:.3f} ~ {imp_sd[i]:.3f}') if plot_results: # print(m.coef_) plt.figure(figsize=(10, 3), dpi=140) R, C = 1, 3 plt.subplot(R, C, 1) # print(X_test.shape, results['feat_names']) viz.plot_confusion_matrix(Y_test, preds, classes=np.array(['Failure', 'Success'])) plt.subplot(R, C, 2) prec, rec, thresh = scores_test['precision_recall_curve'][0] plt.plot(rec, prec) plt.xlim((-0.1, 1.1)) plt.ylim((-0.1, 1.1)) plt.ylabel('Precision') plt.xlabel('Recall') plt.subplot(R, C, 3) plt.hist(preds_proba[Y_test == 0], alpha=0.5, label='Failure') plt.hist(preds_proba[Y_test == 1], alpha=0.5, label='Success') plt.xlabel('Predicted probability') plt.ylabel('Count') plt.legend() plt.tight_layout() plt.show() return preds, preds_proba
def calc_errs(preds, y_full_cv)
-
Expand source code
def calc_errs(preds, y_full_cv): tp = np.logical_and(preds == 1, y_full_cv == 1) tn = np.logical_and(preds == 0, y_full_cv == 0) fp = preds > y_full_cv fn = preds < y_full_cv return tp, tn, fp, fn
def get_data_over_folds(model_names, out_dir, cell_nums, X, y, outcome_def='y_consec_sig', dset='clath_aux+gak_a7d2')
-
Returns predictions/labels over folds in the dataset Params
cell_nums
:pd.Series
- equivalent to df.cell_num
Returns
d_full_cv
:pd.DataFrame
- n rows, one for each data point in the training set (over all folds) 2 columns for each model, one for predictions, and one for predicted probabilities
idxs_cv
:np.array
- indexes corresponding locations of the validation set for example, df.y_thresh.iloc[idxs_cv] would yield all the labels corresponding to the cv preds
Expand source code
def get_data_over_folds(model_names: list, out_dir: str, cell_nums: pd.Series, X, y, outcome_def='y_consec_sig', dset='clath_aux+gak_a7d2'): '''Returns predictions/labels over folds in the dataset Params ------ cell_nums: pd.Series equivalent to df.cell_num Returns ------- d_full_cv: pd.DataFrame n rows, one for each data point in the training set (over all folds) 2 columns for each model, one for predictions, and one for predicted probabilities idxs_cv: np.array indexes corresponding locations of the validation set for example, df.y_thresh.iloc[idxs_cv] would yield all the labels corresponding to the cv preds ''' # split testing data based on cell num d = {} cell_nums_train = config.DSETS[dset]['train'] kf = KFold(n_splits=len(cell_nums_train)) idxs_cv = [] # get predictions over all folds and save into a dict if not type(model_names) == 'list' and not 'ndarray' in str(type(model_names)): model_names = [model_names] for i, model_name in enumerate(model_names): results_individual = pkl.load(open(f'{out_dir}/{model_name}.pkl', 'rb')) fold_num = 0 for cv_idx, cv_val_idx in kf.split(cell_nums_train): # get sample indices idxs_val_cv = cell_nums.isin(cell_nums_train[np.array(cv_val_idx)]) X_val_cv, Y_val_cv = X[idxs_val_cv], y[idxs_val_cv] # get predictions preds, preds_proba = analyze_individual_results(results_individual, X_val_cv, Y_val_cv, print_results=False, plot_results=False, model_cv_fold=fold_num) d[f'{model_name}_{fold_num}'] = preds d[f'{model_name}_{fold_num}_proba'] = preds_proba if i == 0: idxs_cv.append(np.arange(X.shape[0])[idxs_val_cv]) fold_num += 1 # concatenate over folds d2 = {} for model_name in model_names: d2[model_name] = np.hstack([d[k] for k in d.keys() if model_name in k and not 'proba' in k]) d2[model_name + '_proba'] = np.hstack([d[k] for k in d.keys() if model_name in k and 'proba' in k]) return pd.DataFrame.from_dict(d2), np.hstack(idxs_cv)
def load_results(out_dir)
-
Expand source code
def load_results(out_dir): r = [] for fname in os.listdir(out_dir): d = pkl.load(open(oj(out_dir, fname), 'rb')) metrics = {k: d['cv'][k] for k in d['cv'].keys() if not 'curve' in k} num_pts_by_fold_cv = d['num_pts_by_fold_cv'] out = {k: np.average(metrics[k], weights=num_pts_by_fold_cv) for k in metrics} out.update({k + '_std': np.std(metrics[k]) for k in metrics}) out['model_type'] = fname.replace('.pkl', '') # d['model_type'] imp_mat = np.array(d['imps']['imps']) imp_mu = imp_mat.mean(axis=0) imp_sd = imp_mat.std(axis=0) feat_names = d['feat_names_selected'] out.update({feat_names[i] + '_f': imp_mu[i] for i in range(len(feat_names))}) out.update({feat_names[i] + '_std_f': imp_sd[i] for i in range(len(feat_names))}) r.append(pd.Series(out)) r = pd.concat(r, axis=1, sort=False).T.infer_objects() r = r.reindex(sorted(r.columns), axis=1) # sort the column names r = r.round(3) r = r.set_index('model_type') return r
def load_results_many_models(out_dir, model_names, X_test, Y_test)
-
Expand source code
def load_results_many_models(out_dir, model_names, X_test, Y_test): d = {} for i, model_name in enumerate(model_names): results_individual = pkl.load(open(oj(out_dir, f'{model_name}.pkl'), 'rb')) preds, preds_proba = analyze_individual_results(results_individual, X_test, Y_test, print_results=False, plot_results=False) d[model_name] = preds d[model_name + '_proba'] = preds_proba d[model_name + '_errs'] = preds != Y_test df_preds = pd.DataFrame.from_dict(d) return df_preds
def normalize(df, outcome_def)
-
Expand source code
def normalize(df, outcome_def): X = df[data.get_feature_names(df)] X_mean = X.mean() X_std = X.std() ks = list(X.keys()) norms = {ks[i]: {'mu': X_mean[i], 'std': X_std[i]} for i in range(len(ks))} X = (X - X_mean) / X_std y = df[outcome_def].values return X, y, norms
def normalize_and_predict(m0, feat_names_selected, dset_name, normalize_by_train, exclude_easy_tracks=False, outcome_def='y_consec_thresh')
-
Expand source code
def normalize_and_predict(m0, feat_names_selected, dset_name, normalize_by_train, exclude_easy_tracks=False, outcome_def='y_consec_thresh'): df_new = data.get_data(dset=dset_name, use_processed=True, use_processed_dicts=True, outcome_def=outcome_def, previous_meta_file=oj(DIR_PROCESSED, 'metadata_clath_aux+gak_a7d2.pkl')) if exclude_easy_tracks: df_new = df_new[df_new['valid']] # exclude test cells, short/long tracks, hotspots # impute (only does anything for dynamin data) df_new = df_new.fillna(df_new.median()) X_new = df_new[data.get_feature_names(df_new)] if normalize_by_train: X_new = (X_new - X_mean_train) / X_std_train else: X_new = (X_new - X_new.mean()) / X_new.std() y_new = df_new[outcome_def].values preds_new = m0.predict(X_new[feat_names_selected]) preds_proba_new = m0.predict_proba(X_new[feat_names_selected])[:, 1] Y_maxes = df_new['Y_max'] return df_new, y_new, preds_new, preds_proba_new, Y_maxes