vflow logo

A library for making stability analysis simple. Easily evaluate the effect of judgment calls to your data-science pipeline (e.g. choice of imputation strategy)!

mit license python3.9+ tests tests joss PyPI - version

Why use vflow?

Using vflows simple wrappers facilitates many best practices for data science, as laid out in the predictability, computability, and stability (PCS) framework for veridical data science. The goal of vflow is to easily enable data science pipelines that follow PCS by providing intuitive low-code syntax, efficient and flexible computational backends via Ray, and well-documented, reproducible experimentation via MLflow.

Computation Reproducibility Prediction Stability
Automatic parallelization and caching throughout the pipeline Automatic experiment tracking and saving Filter the pipeline by training and validation performance Replace a single function (e.g. preprocessing) with a set of functions and easily assess the stability of downstream results

Here we show a simple example of an entire data-science pipeline with several perturbations (e.g. different data subsamples, models, and metrics) written simply using vflow.

import sklearn
from sklearn.datasets import make_classification
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, balanced_accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier

from vflow import Vset, init_args

# initialize data
X, y = make_classification()
X_train, X_test, y_train, y_test = init_args(
    train_test_split(X, y),
    names=["X_train", "X_test", "y_train", "y_test"],  # optionally name the args
)

# subsample data
subsampling_funcs = [sklearn.utils.resample for _ in range(3)]
subsampling_set = Vset(
    name="subsampling", vfuncs=subsampling_funcs, output_matching=True
)
X_trains, y_trains = subsampling_set(X_train, y_train)

# fit models
models = [LogisticRegression(), DecisionTreeClassifier()]
modeling_set = Vset(name="modeling", vfuncs=models, vfunc_keys=["LR", "DT"])
modeling_set.fit(X_trains, y_trains)
preds_test = modeling_set.predict(X_test)

# get metrics
binary_metrics_set = Vset(
    name="binary_metrics",
    vfuncs=[accuracy_score, balanced_accuracy_score],
    vfunc_keys=["Acc", "Bal_Acc"],
)
binary_metrics = binary_metrics_set.evaluate(preds_test, y_test)

Once we've written this pipeline, we can easily measure the stability of metrics (e.g. "Accuracy") to our choice of subsampling or model.

Documentation

See the docs for reference on the API

Notebook examples

Note that some of these require more dependencies than just those required for vflow. To install all, run pip install vflow[nb].

Synthetic classification

Enhancer genomics

fMRI voxel prediction

Fashion mnist classification

Feature importance stability

Clinical decision rule vetting

Installation

Stable version

pip install vflow

Development version (unstable)

pip install vflow@git+https://github.com/Yu-Group/veridical-flow

References

@software{duncan2020vflow,
   author = {Duncan, James and Kapoor, Rush and Agarwal, Abhineet and Singh, Chandan and Yu, Bin},
   doi = {10.21105/joss.03895},
   month = {1},
   title = {{VeridicalFlow: a Python package for building trustworthy data science pipelines with PCS}},
   url = {https://doi.org/10.21105/joss.03895},
   year = {2022}
}
Expand source code
"""
.. include:: ../README.md
"""

from .helpers import (
    build_vset,
    cum_acc_by_uncertainty,
    filter_vset_by_metric,
    init_args,
)
from .pipeline import PCSPipeline, build_graph
from .subkey import Subkey
from .utils import (
    apply_vfuncs,
    base_dict,
    combine_dicts,
    combine_keys,
    dict_data,
    dict_keys,
    dict_to_df,
    init_step,
    perturbation_stats,
    sep_dicts,
    to_list,
    to_tuple,
)
from .vfunc import AsyncVfunc, Vfunc, VfuncPromise
from .vset import Vset

__all__ = [
    # vflow.helpers
    "init_args",
    "build_vset",
    "filter_vset_by_metric",
    "cum_acc_by_uncertainty",
    # vflow.pipeline
    "PCSPipeline",
    "build_graph",
    # vflow.subkey
    "Subkey",
    # vflow.utils
    "apply_vfuncs",
    "base_dict",
    "combine_dicts",
    "combine_keys",
    "dict_data",
    "dict_keys",
    "dict_to_df",
    "init_step",
    "perturbation_stats",
    "sep_dicts",
    "to_list",
    "to_tuple",
    # vflow.vfunc
    "Vfunc",
    "AsyncVfunc",
    "VfuncPromise",
    # vflow.vset
    "Vset",
]

Sub-modules

vflow.helpers

User-facing helper functions included at import vflow

vflow.pipeline

Class that stores the entire pipeline of steps in a data-science workflow

vflow.subkey

Defines a parameter from some origin Vset

vflow.utils

Useful functions for converting between different types (dicts, lists, tuples, etc.)

vflow.vfunc

A perturbation that can be used as a step in a pipeline

vflow.vset

Set of vfuncs to be parallelized over in a pipeline. Function arguments are each a list

Functions

def apply_vfuncs(vfuncs: dict, data_dict: dict, lazy: bool = False)

Apply a dictionary of functions vfuncs to each item of data_dict, optionally returning a dictionary of VfuncPromise objects if lazy is True

Output keys are determined by applying combine_keys() to each pair of items from vfuncs and data_dict. This function is used by all Vsets to apply functions.

Parameters

vfuncs : dict
Dictionary of functions to apply to data_dict.
data_dict : dict
Dictionary of parameters to call each function in vfuncs.
lazy : bool (option), default False
If True, vfuncs are applied lazily, returning VfuncPromise objects,

Returns

out_dict : dict
Output dictionary of applying vfuncs to data_dict.
Expand source code
def apply_vfuncs(vfuncs: dict, data_dict: dict, lazy: bool = False):
    """Apply a dictionary of functions `vfuncs` to each item of `data_dict`,
    optionally returning a dictionary of `vflow.vfunc.VfuncPromise` objects if `lazy` is True

    Output keys are determined by applying `combine_keys` to each pair of items from
    `vfuncs` and `data_dict`. This function is used by all Vsets to apply functions.

    Parameters
    ----------
    vfuncs: dict
        Dictionary of functions to apply to `data_dict`.
    data_dict: dict
        Dictionary of parameters to call each function in `vfuncs`.
    lazy: bool (option), default False
        If True, `vfuncs` are applied lazily, returning `vflow.vfunc.VfuncPromise`
        objects,

    Returns
    -------
    out_dict: dict
        Output dictionary of applying `vfuncs` to `data_dict`.
    """
    out_dict = {}
    for vf_k in vfuncs:
        if len(data_dict) == 0:
            func = deepcopy(vfuncs[vf_k])
            if lazy:
                out_dict[vf_k] = VfuncPromise(func)
            else:
                out_dict[vf_k] = func()
        for data_k in data_dict:
            if PREV_KEY in (vf_k, data_k):
                continue

            combined_key = combine_keys(data_k, vf_k)

            if not len(combined_key) > 0:
                continue

            func = deepcopy(vfuncs[vf_k])
            if lazy:
                # return a promise
                out_dict[combined_key] = VfuncPromise(func, *data_dict[data_k])
            else:
                data_list = list(data_dict[data_k])
                for i, data in enumerate(data_list):
                    if isinstance(data, VfuncPromise):
                        data_list[i] = data()
                    if isinstance(func, RayRemoteFun) and not isinstance(
                        data_list[i], ray.ObjectRef
                    ):
                        # send data to Ray's remote object store
                        data_list[i] = ray.put(data_list[i])
                    elif isinstance(data_list[i], ray.ObjectRef):
                        # this is not a remote function so get the data
                        data_list[i] = ray.get(data_list[i])
                out_dict[combined_key] = func(*data_list)

    return out_dict
def base_dict(d: dict)

Remove PREV_KEY from dict d if present

Expand source code
def base_dict(d: dict):
    """Remove PREV_KEY from dict d if present"""
    return {k: v for k, v in d.items() if k != PREV_KEY}
def build_graph(node, draw=True)

Helper function that just calls build_graph_recur with an empty graph

Parameters

node : dict or Vset
 

Returns

G : nx.Digraph()
 
Expand source code
def build_graph(node, draw=True):
    """Helper function that just calls build_graph_recur with an empty graph

    Parameters
    ----------
    node: dict or Vset

    Returns
    -------
    G: nx.Digraph()
    """

    def unnest_node(node):
        """Unnest a node, if necessary (i.e., when node is a tuple)

        Parameters
        ----------
        node: str, dict, Vset, or tuple

        Returns
        -------
        unnested_node: str, Vset, or None
        """
        node_type = type(node)
        if node_type is str or "Vset" in str(node_type):
            return node
        if node_type is tuple:
            return unnest_node(node[0])
        return None

    def build_graph_recur(node, G):
        """Builds a graph up using __prev__ and PREV_KEY pointers

        Parameters
        ----------
        node: str, dict, Vset, or tuple
        G: nx.Digraph()

        Returns
        -------
        G: nx.Digraph()
        """
        # base case: reached starting node
        if isinstance(node, str):
            return G

        # initial case: starting at dict
        if isinstance(node, dict):
            s_node = "End"
            nodes_prev = node[PREV_KEY]
            G.add_edge(nodes_prev[0], s_node)
            for node_prev in nodes_prev[1:]:
                G.add_edge(unnest_node(node_prev), nodes_prev[0])
                G = build_graph_recur(node_prev, G)
            return G

        # main case: at a vfuncset
        if "Vset" in str(type(node)):
            if hasattr(node, PREV_KEY):
                nodes_prev = getattr(node, PREV_KEY)
                for node_prev in nodes_prev:
                    G.add_edge(unnest_node(node_prev), node)
                    G = build_graph_recur(node_prev, G)
            return G

        # nested prev key case
        if isinstance(node, tuple):
            func_node = unnest_node(node[0])
            G = build_graph_recur(func_node, G)
            for arg_node in node[1:]:
                G.add_edge(unnest_node(arg_node), func_node)
                G = build_graph_recur(arg_node, G)
            return G

        return G

    G = nx.DiGraph()
    G = build_graph_recur(node, G)
    if draw:
        nx.draw(G, with_labels=True, node_color="#CCCCCC")
    return G
def build_vset(name: str, func, param_dict=None, reps: int = 1, is_async: bool = False, output_matching: bool = False, lazy: bool = False, cache_dir: str = None, tracking_dir: str = None, **kwargs) ‑> Vset

Builds a new Vset by currying or instantiating callable func with all combinations of parameters in param_dict and optional additional **kwargs. If func and param_dict are lists, then the ith entry of func will be curried with ith entry of param_dict. If only one of func or param_dict is a list, the same func/param_dict will be curried for all entries in the list. Vfuncs are named with param_dict items as tuples of str("param_name=param_val").

Parameters

name : str
A name for the output Vset.
func : callable or list[callable]
A callable to use as the base for Vfuncs in the output Vset. Can also be a class object, in which case the class is immediately instantiated with the parameter combinations from param_dict. Can also be a list of callables, where the ith entry corresponds to param_dict or the ith entry of param_dict (if param_dict is a list).
param_dict : dict[str, list] or list[dict[str, list]], optional
A dict with string keys corresponding to argument names of func and entries which are lists of values to pass to func at run time (or when instantiating func if it's a class object). Can also be a list of dicts, where the ith dict entry corresponds to func or the ith entry of func (if func is a list). If no parameters are required for the ith function, the ith entry of param_dict can be None.
reps : int, optional
The number of times to repeat func in the output Vset's vfuncs for each combination of the parameters in param_dict.
is_async : bool, optional
If True, vfuncs are computed asynchronously.
output_matching : bool, optional
If True, then output keys from Vset will be matched when used in other Vsets.
cache_dir : str, optional
If provided, do caching and use cache_dir as the data store for joblib.Memory.
tracking_dir : str, optional
If provided, use the mlflow.tracking API to log outputs as metrics with parameters determined by input keys.
**kwargs
Additional fixed keyword arguments to pass to func.

Returns

new_vset : Vset
 
Expand source code
def build_vset(
    name: str,
    func,
    param_dict=None,
    reps: int = 1,
    is_async: bool = False,
    output_matching: bool = False,
    lazy: bool = False,
    cache_dir: str = None,
    tracking_dir: str = None,
    **kwargs,
) -> Vset:
    """Builds a new Vset by currying or instantiating callable `func` with all
    combinations of parameters in `param_dict` and optional additional `**kwargs`.
    If `func` and `param_dict` are lists, then the ith entry of `func` will be
    curried with ith entry of `param_dict`. If only one of `func` or `param_dict`
    is a list, the same `func`/`param_dict` will be curried for all entries in the
    list. Vfuncs are named with `param_dict` items as tuples of
    str("param_name=param_val").

    Parameters
    ----------
    name : str
        A name for the output Vset.
    func : callable or list[callable]
        A callable to use as the base for Vfuncs in the output Vset. Can also be
        a class object, in which case the class is immediately instantiated with
        the parameter combinations from `param_dict`. Can also be a list of
        callables, where the ith entry corresponds to `param_dict` or the ith
        entry of `param_dict` (if `param_dict` is a list).
    param_dict : dict[str, list] or list[dict[str, list]], optional
        A dict with string keys corresponding to argument names of `func` and
        entries which are lists of values to pass to `func` at run time (or when
        instantiating `func` if it's a class object). Can also be a list of
        dicts, where the ith dict entry corresponds to `func` or the ith entry
        of `func` (if `func` is a list). If no parameters are required for the
        ith function, the ith entry of `param_dict` can be `None`.
    reps : int, optional
        The number of times to repeat `func` in the output Vset's vfuncs for
        each combination of the parameters in `param_dict`.
    is_async : bool, optional
        If True, vfuncs are computed asynchronously.
    output_matching : bool, optional
        If True, then output keys from Vset will be matched when used in other
        Vsets.
    cache_dir : str, optional
        If provided, do caching and use `cache_dir` as the data store for
        joblib.Memory.
    tracking_dir : str, optional
        If provided, use the mlflow.tracking API to log outputs as metrics with
        parameters determined by input keys.
    **kwargs
        Additional fixed keyword arguments to pass to `func`.

    Returns
    -------
    new_vset : vflow.vset.Vset

    """
    f_list = []
    pd_list = []
    if isinstance(func, list):
        if isinstance(param_dict, list):
            assert len(param_dict) == len(
                func
            ), "list of param_dicts must be same length as list of funcs"
            f_list.extend(func)
            pd_list.extend(param_dict)
        else:
            pd_list.extend([param_dict] * len(func))
            f_list.extend(func)
    elif isinstance(param_dict, list):
        f_list.extend([func] * len(param_dict))
        pd_list.extend(param_dict)
    else:
        f_list.append(func)
        pd_list.append(param_dict)

    vfuncs = []
    vkeys = []

    for f, pd in zip(f_list, pd_list):
        if pd is None:
            pd = {}
        assert callable(f), "func must be callable"

        kwargs_tuples = product(*list(pd.values()))
        for tup in kwargs_tuples:
            kwargs_dict = {}
            vkey_tup = (f"func={f.__name__}",)
            for param_name, param_val in zip(list(pd.keys()), tup):
                kwargs_dict[param_name] = param_val
                vkey_tup += (f"{param_name}={param_val}",)
            # add additional fixed kwargs to kwargs_dict
            for k, v in kwargs.items():
                kwargs_dict[k] = v
            for i in range(reps):
                # add vfunc key to vkeys
                if reps > 1:
                    vkeys.append((f"rep={i}",) + vkey_tup)
                else:
                    vkeys.append(vkey_tup)
                # check if func is a class
                if isinstance(f, type):
                    # instantiate func
                    vfuncs.append(Vfunc(vfunc=f(**kwargs_dict), name=str(vkey_tup)))
                else:
                    # use partial to wrap func
                    vfuncs.append(
                        Vfunc(vfunc=partial(f, **kwargs_dict), name=str(vkey_tup))
                    )
    if all(pd is None for pd in pd_list) and reps == 1:
        vkeys = None

    return Vset(
        name,
        vfuncs,
        is_async=is_async,
        vfunc_keys=vkeys,
        output_matching=output_matching,
        lazy=lazy,
        cache_dir=cache_dir,
        tracking_dir=tracking_dir,
    )
def combine_dicts(*args: dict, base_case=True)

Combines any number of dictionaries into a single dictionary. Dictionaries are combined left to right matching all keys according to combine_keys()

Parameters

*args : dict
Dictionaries to recursively combine left to right.

Returns

combined_dict : dict
Combined dictionary.
Expand source code
def combine_dicts(*args: dict, base_case=True):
    """Combines any number of dictionaries into a single dictionary. Dictionaries
    are combined left to right matching all keys according to `combine_keys`

    Parameters
    ----------
    *args: dict
        Dictionaries to recursively combine left to right.

    Returns
    -------
    combined_dict: dict
        Combined dictionary.
    """
    n_args = len(args)
    combined_dict = {}
    if n_args == 0:
        return combined_dict
    if n_args == 1:
        for k in args[0]:
            # wrap the dict values in tuples; this is helpful so that when we
            # pass the values to a vfunc fun in we can just use * expansion
            if k != PREV_KEY:
                combined_dict[k] = (args[0][k],)
            else:
                combined_dict[k] = args[0][k]
        return combined_dict
    if n_args == 2:
        for k0 in args[0]:
            for k1 in args[1]:
                if PREV_KEY in (k0, k1):
                    continue

                combined_key = combine_keys(k0, k1)

                if len(combined_key) > 0:
                    if base_case:
                        combined_dict[combined_key] = (args[0][k0], args[1][k1])
                    else:
                        combined_dict[combined_key] = args[0][k0] + (args[1][k1],)

        return combined_dict
    # combine the first two dicts and call recursively with remaining args
    return combine_dicts(combine_dicts(args[0], args[1]), *args[2:], base_case=False)
def combine_keys(left_key, right_key)

Combines left_key and right_key, attempting to match on any Subkey where Subkey.is_matching() is True.

Returns an empty key on failed matches when Subkey.mismatches() is True. Always filters on right_key and returns combined_key with left_key prefix.

Parameters

left_key : tuple
Left tuple key to combine.
right_key : tuple
Right tuple key to combine.

Returns

combined_key : tuple
Combined tuple key filtered according to Subkey.matches() rules, which is empty according to Subkey.mismatches() rule.
Expand source code
def combine_keys(left_key, right_key):
    """Combines `left_key` and `right_key`, attempting to match on any `Subkey` where
    `vflow.subkey.Subkey.is_matching` is `True`.

    Returns an empty key on failed matches when
    `vflow.subkey.Subkey.mismatches` is `True`. Always filters on `right_key`
    and returns `combined_key` with `left_key` prefix.

    Parameters
    ----------
    left_key: tuple
        Left tuple key to combine.
    right_key: tuple
        Right tuple key to combine.

    Returns
    -------
    combined_key: tuple
        Combined tuple key filtered according to `vflow.subkey.Subkey.matches` rules,
        which is empty according to `vflow.subkey.Subkey.mismatches` rule.

    """
    if len(left_key) < len(right_key):
        match_key = left_key
        compare_key = right_key
    else:
        match_key = right_key
        compare_key = left_key
    match_subkeys = [subkey for subkey in match_key if subkey.is_matching()]
    if len(match_subkeys) > 0:
        matched_subkeys = []
        for subkey in match_subkeys:
            for c_subkey in compare_key:
                if subkey.matches(c_subkey):
                    matched_subkeys.append(subkey)
                    break
                if subkey.mismatches(c_subkey):
                    # subkeys with same origin but different values are rejected
                    return ()
        if len(matched_subkeys) > 0:
            # always filter on right key
            filtered_key = tuple(
                subkey for subkey in right_key if subkey not in matched_subkeys
            )
            combined_key = left_key + filtered_key
            return combined_key
        return left_key + right_key
    return left_key + right_key
def cum_acc_by_uncertainty(mean_preds, std_preds, true_labels)

Returns uncertainty and cumulative accuracy for grouped class predictions, sorted in increasing order of uncertainty

Params

mean_preds: dict mean predictions, output from Vset.predict_with_uncertainties std_preds: dict std predictions, output from Vset.predict_with_uncertainties true_labels: dict or list-like

TODO: generalize to multi-class classification

Expand source code
def cum_acc_by_uncertainty(mean_preds, std_preds, true_labels):
    """Returns uncertainty and cumulative accuracy for grouped class predictions,
    sorted in increasing order of uncertainty

    Params
    ------
    mean_preds: dict
        mean predictions, output from Vset.predict_with_uncertainties
    std_preds: dict
        std predictions, output from Vset.predict_with_uncertainties
    true_labels: dict or list-like

    TODO: generalize to multi-class classification
    """
    assert dict_keys(mean_preds) == dict_keys(
        std_preds
    ), "mean_preds and std_preds must share the same keys"
    # match predictions on keys
    paired_preds = [
        [d[k] for d in (mean_preds, std_preds)] for k in dict_keys(mean_preds)
    ]
    mean_preds, std_preds = (np.array(p)[:, :, 1] for p in zip(*paired_preds))
    if isinstance(true_labels, dict):
        true_labels = dict_data(true_labels)
        assert len(true_labels) == 1, "true_labels should have a single 1D vector entry"
        true_labels = true_labels[0]
    n_obs = len(mean_preds[0])
    assert (
        len(true_labels) == n_obs
    ), f"true_labels has {len(true_labels)} obs. but should have same as predictions ({n_obs})"
    sorted_idx = np.argsort(std_preds, axis=1)
    correct_labels = np.take_along_axis(
        np.around(mean_preds) - true_labels == 0, sorted_idx, 1
    )
    uncertainty = np.take_along_axis(std_preds, sorted_idx, 1)
    cum_acc = np.cumsum(correct_labels, axis=1) / range(1, n_obs + 1)
    return uncertainty, cum_acc, sorted_idx
def dict_data(d: dict)

Returns a list containing all data in dict d

Expand source code
def dict_data(d: dict):
    """Returns a list containing all data in dict d"""
    return list(base_dict(d).values())
def dict_keys(d: dict)

Returns a list containing all keys in dict d

Expand source code
def dict_keys(d: dict):
    """Returns a list containing all keys in dict d"""
    return list(base_dict(d).keys())
def dict_to_df(d: dict, param_key=None)

Converts a dictionary with tuple keys into a pandas DataFrame, optionally seperating parameters in param_key if not None

Parameters

d : dict
Output dictionary with tuple keys from a Vset.
param_key : str (optional), default None
Name of parameter to seperate into multiple columns.

Returns

df : pandas.DataFrame
A DataFrame with d tuple keys seperated into columns.
Expand source code
def dict_to_df(d: dict, param_key=None):
    """Converts a dictionary with tuple keys
    into a pandas DataFrame, optionally seperating
    parameters in `param_key` if not None

    Parameters
    ----------
    d: dict
        Output dictionary with tuple keys from a Vset.
    param_key: str (optional), default None
        Name of parameter to seperate into multiple columns.

    Returns
    -------
    df: pandas.DataFrame
        A DataFrame with `d` tuple keys seperated into columns.
    """
    d_copy = {tuple(sk.value for sk in k): d[k] for k in d if k != PREV_KEY}
    df = pd.Series(d_copy).reset_index()
    if len(d_copy.keys()) > 0:
        key_list = list(d.keys())
        subkey_list = key_list[0] if key_list[0] != PREV_KEY else key_list[1]
        cols = [sk.origin for sk in subkey_list] + ["out"]
        # set each init col to init-{next_vfunc_set}
        cols = [
            c if c != "init" else init_step(idx, cols) for idx, c in enumerate(cols)
        ]
        df = df.set_axis(cols, axis=1)
        if param_key:
            param_keys = df[
                param_key
            ].tolist()  # pylint: disable=unsubscriptable-object
            if param_key == "out" and hasattr(param_keys[0], "__iter__"):
                param_df = pd.DataFrame(param_keys)
                param_df.columns = [f"{param_key}-{col}" for col in param_df.columns]
                df = df.join(param_df)
            else:
                param_loc = df.columns.get_loc(param_key)
                param_key_cols = [
                    f"{p.split('=')[0]}-{param_key}" for p in param_keys[0]
                ]
                param_keys = [[s.split("=")[1] for s in t] for t in param_keys]
                df = df.join(pd.DataFrame(param_keys)).drop(columns=param_key)
                new_cols = df.columns[: len(cols) - 1].tolist() + param_key_cols
                df = df.set_axis(new_cols, axis=1)
                new_idx = list(range(len(new_cols)))
                new_idx = (
                    new_idx[:param_loc]
                    + new_idx[len(cols) - 1 :]
                    + new_idx[param_loc : len(cols) - 1]
                )
                df = df.iloc[:, new_idx]
    return df
def filter_vset_by_metric(metric_dict: dict, vset: Vset, *vsets: Vset, n_keep: int = 1, bigger_is_better: bool = True, filter_on=None, group: bool = False) ‑> Union[Vset, list]

Returns a new Vset by filtering vset.vfuncs based on values in filter_dict.

Parameters

metric_dict : dict
output from a Vset, typically with metrics or other numeric values to use when filtering vset.vfuncs
vset : Vset
a Vsets
*vsets : Vset
zero or more additional Vsets
n_keep : int (optional)
number of entries to keep from vset.vfuncs
bigger_is_better : bool (optional)
if True, then the top n_keep largest values are retained
filter_on : list[str] (optional)
if there are multiple metrics in metric_dict, you can specify a subset to consider
group : bool (optional)
if True, average metrics after grouping values in metric_dict by the input Vset names

Returns

*new_vset : Vset
Copies of the input Vsets but with Vfuncs filtered based on metrics
Expand source code
def filter_vset_by_metric(
    metric_dict: dict,
    vset: Vset,
    *vsets: Vset,
    n_keep: int = 1,
    bigger_is_better: bool = True,
    filter_on=None,
    group: bool = False,
) -> Union[Vset, list]:
    """Returns a new Vset by filtering `vset.vfuncs` based on values in filter_dict.

    Parameters
    ----------
    metric_dict: dict
        output from a Vset, typically with metrics or other numeric values to use when
        filtering `vset.vfuncs`
    vset: Vset
        a Vsets
    *vsets: Vset
        zero or more additional Vsets
    n_keep: int (optional)
        number of entries to keep from `vset.vfuncs`
    bigger_is_better: bool (optional)
        if True, then the top `n_keep` largest values are retained
    filter_on: list[str] (optional)
        if there are multiple metrics in `metric_dict`, you can specify a subset
        to consider
    group: bool (optional)
        if True, average metrics after grouping values in `metric_dict` by the
        input Vset names

    Returns
    -------
    *new_vset : Vset
        Copies of the input Vsets but with Vfuncs filtered based on metrics
    """
    if filter_on is None:
        filter_on = []
    df = dict_to_df(metric_dict)
    vsets = [vset, *vsets]
    vset_names = []
    for vset_i in vsets:
        if vset_i.name not in df.columns:
            raise ValueError(
                (
                    f"{vset_i.name} should be one "
                    "of the columns of dict_to_df(metric_dict)"
                )
            )
        vset_names.append(vset_i.name)
    if len(filter_on) > 0:
        filter_col = list(metric_dict.keys())[0][-1].origin
        df = df[df[filter_col].isin(filter_on)]
    if group:
        df = df.groupby(by=vset_names, as_index=False).mean(numeric_only=True)
    if bigger_is_better:
        df = df.sort_values(by="out", ascending=False)
    else:
        df = df.sort_values(by="out")
    df = df.iloc[0:n_keep]
    for i, vset_i in enumerate(vsets):
        vfuncs = vset_i.vfuncs
        vfunc_filter = [str(name) for name in df[vset_i.name].to_numpy()]
        new_vfuncs = {k: v for k, v in vfuncs.items() if str(v.name) in vfunc_filter}
        tracking_dir = None if vset_i._mlflow is None else mlflow.get_tracking_uri()
        new_vset = Vset(
            "filtered_" + vset_i.name,
            new_vfuncs,
            is_async=vset_i._async,
            output_matching=vset_i._output_matching,
            lazy=vset_i._lazy,
            cache_dir=vset_i._cache_dir,
            tracking_dir=tracking_dir,
        )
        setattr(
            new_vset,
            FILTER_PREV_KEY,
            (
                metric_dict[PREV_KEY],
                vset_i,
            ),
        )
        setattr(new_vset, PREV_KEY, getattr(new_vset, FILTER_PREV_KEY))
        vsets[i] = new_vset
    if len(vsets) == 1:
        return vsets[0]
    return vsets
def init_args(args_tuple: Union[tuple, list], names=None)

Converts tuple of arguments to a list of dicts

Parameters

names : list-like (optional), default None
given names for each of the arguments in the tuple
Expand source code
def init_args(args_tuple: Union[tuple, list], names=None):
    """Converts tuple of arguments to a list of dicts

    Parameters
    ----------
    names: list-like (optional), default None
        given names for each of the arguments in the tuple
    """
    if names is None:
        names = ["start"] * len(args_tuple)
    else:
        assert len(names) == len(
            args_tuple
        ), "names should be same length as args_tuple"
    output_dicts = []
    for i, _ in enumerate(args_tuple):
        output_dicts.append(
            {
                (Subkey(names[i], "init"),): args_tuple[i],
                PREV_KEY: ("init",),
            }
        )
    return output_dicts
def init_step(idx, cols)

Helper function to find init suffix in a column

Parameters

idx : int
Index of 'init' column in cols.
cols : list[str]
List of column names.
Expand source code
def init_step(idx, cols):
    """Helper function to find init suffix
    in a column

    Parameters
    ----------
    idx: int
        Index of 'init' column in cols.
    cols: list[str]
        List of column names.
    """
    for i in range(idx, len(cols)):
        if cols[i] != "init":
            return "init-" + cols[i]
    return None
def perturbation_stats(data: Union[pandas.core.frame.DataFrame, dict], *group_by: str, wrt: str = 'out', func=None, prefix: str = None, split: bool = False)

Compute statistics for wrt in data, conditional on group_by

Parameters

data : Union[pandas.DataFrame, dict]
DataFrame, as from calling dict_to_df() on an output dict from a Vset, or the output dict itself.
*group_by : str
Vset names in data to group on. If none provided, treats everything as one big group.
wrt : str (optional)
Column name in data or dict_to_df()(data) on which to compute statistics. Defaults to 'out', the values of the original Vset output dict.
func : function, str, list or dict (optional), default None
A list of functions or function names to use for computing statistics, analogous to the parameter of the same name in pandas.core.groupby.DataFrameGroupBy.aggregate. If None, defaults to ['count', 'mean', 'std'].
prefix : str (optional), default None
A string to prefix to new columns in output DataFrame. If None, uses the value of wrt.
split : bool (optional), default False
If True and wrt in data has list or numpy.ndarray entries, will attempt to split the entries into multiple columns for the output.

Returns

df : pandas.DataFrame
A DataFrame with summary statistics on wrt.
Expand source code
def perturbation_stats(
    data: Union[pd.DataFrame, dict],
    *group_by: str,
    wrt: str = "out",
    func=None,
    prefix: str = None,
    split: bool = False,
):
    """Compute statistics for `wrt` in `data`, conditional on `group_by`

    Parameters
    ----------
    data: Union[pandas.DataFrame, dict]
        DataFrame, as from calling `dict_to_df` on an output dict from a Vset,
        or the output dict itself.
    *group_by: str
        Vset names in `data` to group on. If none provided, treats everything as one big
        group.
    wrt: str (optional)
        Column name in `data` or `dict_to_df(data)` on which to compute statistics.
        Defaults to `'out'`, the values of the original Vset output dict.
    func: function, str, list or dict (optional), default None
        A list of functions or function names to use for computing
        statistics, analogous to the parameter of the same name in
        pandas.core.groupby.DataFrameGroupBy.aggregate. If `None`, defaults to
        `['count', 'mean', 'std']`.
    prefix: str (optional), default None
        A string to prefix to new columns in output DataFrame. If `None`,
        uses the value of `wrt`.
    split: bool (optional), default False
        If `True` and `wrt` in `data` has `list` or `numpy.ndarray` entries, will
        attempt to split the entries into multiple columns for the output.

    Returns
    -------
    df: pandas.DataFrame
        A DataFrame with summary statistics on `wrt`.
    """
    if func is None:
        func = ["count", "mean", "std"]
    if prefix is None:
        prefix = wrt
    if isinstance(data, dict):
        df = dict_to_df(data)
    else:
        df = data
    group_by = list(group_by)
    if len(group_by) > 0:
        gb = df.groupby(group_by)[wrt]
    else:
        gb = df.groupby(lambda x: True)[wrt]
    if (isinstance(func, list) and "mean" in func or "std" in func) and (
        type(df[wrt].iloc[0]) in [list, np.ndarray]
    ):
        wrt_arrays = [
            np.stack(d.tolist()) for d in (gb.get_group(grp) for grp in gb.groups)
        ]
        n_cols = wrt_arrays[0].shape[1]
        df_out = pd.DataFrame(gb.agg("count"))
        df_out.columns = [f"{prefix}-count"]
        if "mean" in func:
            if split:
                col_means = [arr.mean(axis=0) for arr in wrt_arrays]
                wrt_means = pd.DataFrame(
                    col_means,
                    columns=[f"{prefix}{i}-mean" for i in range(n_cols)],
                    index=gb.groups.keys(),
                )
            else:
                col_means = [{f"{prefix}-mean": arr.mean(axis=0)} for arr in wrt_arrays]
                wrt_means = pd.DataFrame(col_means, index=gb.groups.keys())
            wrt_means.index.names = df_out.index.names
            df_out = df_out.join(wrt_means)
        if "std" in func:
            if split:
                col_stds = [arr.std(axis=0, ddof=1) for arr in wrt_arrays]
                wrt_stds = pd.DataFrame(
                    col_stds,
                    columns=[f"{prefix}{i}-std" for i in range(n_cols)],
                    index=gb.groups.keys(),
                )
            else:
                col_stds = [
                    {f"{prefix}-std": arr.std(axis=0, ddof=1)} for arr in wrt_arrays
                ]
                wrt_stds = pd.DataFrame(col_stds, index=gb.groups.keys())
            wrt_stds.index.names = df_out.index.names
            df_out = df_out.join(wrt_stds)
        if "count" not in func:
            df_out = df_out.drop(f"{prefix}-count")
    else:
        df_out = gb.agg(func)
    df_out = df_out.reindex(sorted(df_out.columns), axis=1)
    df_out.reset_index(inplace=True)
    if len(group_by) > 0:
        return df_out.sort_values(group_by[0])
    return df_out
def sep_dicts(d: dict, n_out: int = 1, keys=None)

Converts dictionary with value being saved as an iterable into multiple dictionaries

Assumes every value has same length n_out

Parameters

d : dict
Dictionary with iterable values to be converted.
n_out : int, default 1
The number of dictionaries to separate d into.
keys : list-like, default None
Optional list of keys to use in output dicts.

Returns

sep_dicts_list : list
List of seperated dictionaries.

Examples

>>> sep_dicts({k1: (x1, y1), k2: (x2, y2), ...,  '__prev__': p})
[{k1: x1, k2: x2, ..., '__prev__': p}, {k1: y1, k2: y2, ..., '__prev__': p}]
Expand source code
def sep_dicts(d: dict, n_out: int = 1, keys=None):
    """Converts dictionary with value being saved as an iterable into multiple dictionaries

    Assumes every value has same length n_out

    Parameters
    ----------
    d: dict
        Dictionary with iterable values to be converted.
    n_out: int, default 1
        The number of dictionaries to separate d into.
    keys: list-like, default None
        Optional list of keys to use in output dicts.

    Returns
    -------
    sep_dicts_list: list
        List of seperated dictionaries.

    Examples
    --------
    >>> sep_dicts({k1: (x1, y1), k2: (x2, y2), ...,  '__prev__': p})
    [{k1: x1, k2: x2, ..., '__prev__': p}, {k1: y1, k2: y2, ..., '__prev__': p}]
    """
    if keys is None:
        keys = []
    if len(keys) > 0 and len(keys) != n_out:
        raise ValueError(f"keys should be empty or have length n_out={n_out}")
    # empty dict -- return empty dict
    if n_out <= 1:
        return d
    # try separating dict into multiple dicts
    sep_dicts_id = str(uuid4())  # w/ high prob, uuid4 is unique
    sep_dicts_list = [{} for _ in range(n_out)]
    for key, value in d.items():
        if key != PREV_KEY:
            for i in range(n_out):
                # assumes the correct sub-key for item i is in the i-th position
                if len(keys) == 0:
                    new_key = (key[i],) + key[n_out:]
                else:
                    new_sub = Subkey(
                        value=keys[i], origin=key[-1].origin + "-" + str(i)
                    )
                    new_key = (new_sub,) + key
                new_key[-1].sep_dicts_id = sep_dicts_id
                if isinstance(value, VfuncPromise):
                    # return a promise to get the value at index i of the
                    # original promise
                    value_i = VfuncPromise(lambda v, x: v[x], value, i)
                else:
                    value_i = value[i]
                sep_dicts_list[i][new_key] = value_i

    return sep_dicts_list
def to_list(tup: tuple)

Convert from tuple to packed list

Allows us to call function with arguments in a loop

Parameters

tup : tuple
tuple of objects to convert to packed list

Raises

ValueError
If passed uneven number of arguments without a list. Please wrap your args in a list.

Examples

>>> to_list(([x1, x2, x3], [y1, y2, y3]))
[[x1, y1], [x2, y2], [x3, y3]]
>>> to_list(([x1], [y1]))
[[x1, y1]]
>>> to_list(([x1, x2, x3], ))
[[x1], [x2], [x3]]
>>> to_list((x1, ))
[[x1]]
>>> to_list((x1, y1))
[[x1, y1]]
>>> to_list((x1, x2, x3, y1, y2, y3))
[[x1, y1], [x2, y2], [x3, y3]]
Expand source code
def to_list(tup: tuple):
    """Convert from tuple to packed list

    Allows us to call function with arguments in a loop

    Parameters
    ----------
    tup: tuple
        tuple of objects to convert to packed list

    Raises
    ------
    ValueError
        If passed uneven number of arguments without a list. Please wrap your args in a list.

    Examples
    --------
    >>> to_list(([x1, x2, x3], [y1, y2, y3]))
    [[x1, y1], [x2, y2], [x3, y3]]
    >>> to_list(([x1], [y1]))
    [[x1, y1]]
    >>> to_list(([x1, x2, x3], ))
    [[x1], [x2], [x3]]
    >>> to_list((x1, ))
    [[x1]]
    >>> to_list((x1, y1))
    [[x1, y1]]
    >>> to_list((x1, x2, x3, y1, y2, y3))
    [[x1, y1], [x2, y2], [x3, y3]]
    """
    n_tup = len(tup)
    if n_tup == 0:
        return []
    if not isinstance(tup[0], list):
        # the first element is data
        if n_tup == 1:
            return [list(tup)]
        if n_tup % 2 != 0:
            raise ValueError(
                "Don't know how to handle uneven number of args "
                "without a list. Please wrap your args in a list."
            )
        # assume first half of args is input and second half is outcome
        return [list(el) for el in zip(tup[: (n_tup // 2)], tup[(n_tup // 2) :])]
    if n_tup == 1:
        return [[x] for x in tup[0]]
    n_mods = len(tup[0])
    lists_packed = [[] for _ in range(n_mods)]
    for i in range(n_mods):
        for j in range(n_tup):
            lists_packed[i].append(tup[j][i])
    return lists_packed
def to_tuple(lists: list)

Convert from lists to unpacked tuple

Allows us to write X, y = to_tuple([[x1, y1], [x2, y2], [x3, y3]])

Parameters

lists : list
list of objects to convert to unpacked tuple

Examples

>>> to_tuple([[x1, y1], [x2, y2], [x3, y3]])
([x1, x2, x3], [y1, y2, y3])
>>> to_tuple([[x1, y1]])
([x1], [y1])
>>> to_tuple([m1, m2, m3])
[m1, m2, m3]
Expand source code
def to_tuple(lists: list):
    """Convert from lists to unpacked tuple

    Allows us to write `X, y = to_tuple([[x1, y1], [x2, y2], [x3, y3]])`

    Parameters
    ----------
    lists: list
        list of objects to convert to unpacked tuple

    Examples
    --------
    >>> to_tuple([[x1, y1], [x2, y2], [x3, y3]])
    ([x1, x2, x3], [y1, y2, y3])
    >>> to_tuple([[x1, y1]])
    ([x1], [y1])
    >>> to_tuple([m1, m2, m3])
    [m1, m2, m3]
    """
    n_mods = len(lists)
    if n_mods <= 1:
        return lists
    if not isinstance(lists[0], list):
        return lists
    n_tup = len(lists[0])
    tup = [[] for _ in range(n_tup)]
    for i in range(n_mods):
        for j in range(n_tup):
            tup[j].append(lists[i][j])
    return tuple(tup)

Classes

class AsyncVfunc (name: str = '', vfunc=<function AsyncVfunc.<lambda>>)

An asynchronous version of the Vfunc class.

Expand source code
class AsyncVfunc:
    """An asynchronous version of the Vfunc class."""

    def __init__(self, name: str = "", vfunc=lambda x: x):
        self.name = name
        if isinstance(vfunc, Vfunc):
            self.vfunc = vfunc.vfunc
        else:
            assert hasattr(vfunc, "fit") or callable(
                vfunc
            ), "vfunc must be an object with a fit method or a callable"
            self.vfunc = vfunc

    def fit(self, *args, **kwargs):
        """This function fits params for this vfunc"""
        if hasattr(self.vfunc, "fit"):
            return _remote_fun.remote(self.vfunc.fit, *args, **kwargs)
        return _remote_fun.remote(self.vfunc, *args, **kwargs)

    def transform(self, *args, **kwargs):
        """This function transforms its input in some way"""
        if hasattr(self.vfunc, "transform"):
            return _remote_fun.remote(self.vfunc.transform, *args, **kwargs)
        return _remote_fun.remote(self.vfunc, *args, **kwargs)

    def __call__(self, *args, **kwargs):
        """This should decide what to call"""
        return self.fit(*args, **kwargs)

Methods

def fit(self, *args, **kwargs)

This function fits params for this vfunc

Expand source code
def fit(self, *args, **kwargs):
    """This function fits params for this vfunc"""
    if hasattr(self.vfunc, "fit"):
        return _remote_fun.remote(self.vfunc.fit, *args, **kwargs)
    return _remote_fun.remote(self.vfunc, *args, **kwargs)
def transform(self, *args, **kwargs)

This function transforms its input in some way

Expand source code
def transform(self, *args, **kwargs):
    """This function transforms its input in some way"""
    if hasattr(self.vfunc, "transform"):
        return _remote_fun.remote(self.vfunc.transform, *args, **kwargs)
    return _remote_fun.remote(self.vfunc, *args, **kwargs)
class PCSPipeline (steps=None, cache_dir=None)

Parameters

steps : list
a list of Vset instances
cache_dir : str, default=None
The directory to use as data store by joblib. If None, won't do caching.
Expand source code
class PCSPipeline:
    def __init__(self, steps=None, cache_dir=None):
        """
        Parameters
        ----------
        steps: list
            a list of Vset instances
        cache_dir: str, default=None
            The directory to use as data store by `joblib`. If None, won't do
            caching.
        """
        if steps is None:
            steps = []
        self.steps = steps
        # set up the cache
        self.memory = joblib.Memory(location=cache_dir)

    def run(self, *args, **kwargs):
        """Runs the pipeline"""
        run_step_cached = self.memory.cache(_run_step)
        for i, step in enumerate(self.steps):
            try:
                step_name = step.name
            except AttributeError:
                step_name = f"Step {i}"
            print(step_name)
            _, fitted_step = run_step_cached(step, *args, **kwargs)
            self.steps[i] = fitted_step

    def __getitem__(self, i):
        """Accesses ith step of pipeline"""
        return self.steps[i]

    def __len__(self):
        return len(self.steps)

    def generate_names(self, as_pandas=True):
        name_lists = []
        if as_pandas:
            for step in self.steps:
                name_lists.append([f"{i}_{str(mod)[:8]}" for i, mod in enumerate(step)])
            indexes = list(itertools.product(*name_lists))
            return pd.DataFrame(indexes, columns=[step.name for step in self.steps])
        for step in self.steps:
            name_lists.append(
                [f"{step.name}_{i}_{str(mod)[:8]}" for i, mod in enumerate(step)]
            )
        return list(itertools.product(*name_lists))

Methods

def generate_names(self, as_pandas=True)
Expand source code
def generate_names(self, as_pandas=True):
    name_lists = []
    if as_pandas:
        for step in self.steps:
            name_lists.append([f"{i}_{str(mod)[:8]}" for i, mod in enumerate(step)])
        indexes = list(itertools.product(*name_lists))
        return pd.DataFrame(indexes, columns=[step.name for step in self.steps])
    for step in self.steps:
        name_lists.append(
            [f"{step.name}_{i}_{str(mod)[:8]}" for i, mod in enumerate(step)]
        )
    return list(itertools.product(*name_lists))
def run(self, *args, **kwargs)

Runs the pipeline

Expand source code
def run(self, *args, **kwargs):
    """Runs the pipeline"""
    run_step_cached = self.memory.cache(_run_step)
    for i, step in enumerate(self.steps):
        try:
            step_name = step.name
        except AttributeError:
            step_name = f"Step {i}"
        print(step_name)
        _, fitted_step = run_step_cached(step, *args, **kwargs)
        self.steps[i] = fitted_step
class Subkey (value, origin: str, output_matching: bool = False)

Parameters

value : Any
subkey value corresponding to a Vset vfunc
origin : str
name of the origin Vset of this Subkey
output_matching : bool (optional), default False
inherited from the Vset where the Subkey is created
Expand source code
class Subkey:
    def __init__(self, value, origin: str, output_matching: bool = False):
        """
        Parameters
        ----------
        value: Any
            subkey value corresponding to a Vset vfunc
        origin: str
            name of the origin Vset of this Subkey
        output_matching: bool (optional), default False
            inherited from the Vset where the Subkey is created
        """
        self.value = value
        self.origin = origin
        self.output_matching = output_matching
        # sep_dicts_id identifies the particular call to sep_dicts() that this
        # key's dictionary went through (if any).
        self.sep_dicts_id = None

    def is_matching(self):
        """Checks if subkey should be matched in other Vsets"""
        return self.output_matching or self.sep_dicts_id is not None

    def matches_sep_dict_id(self, other: object):
        """Helper to match Subkey by _sep_dict_id"""
        if isinstance(other, self.__class__):
            return (
                self.sep_dicts_id is not None
                and self.sep_dicts_id == other.sep_dicts_id
            )
        return False

    def matches(self, other: object):
        """When Subkey matching is required, determines if this Subkey is compatible
        with another, meaning that the origins and values match, and either the
        _sep_dicts_id matches or both Subkeys have _output_matching True.
        """
        if isinstance(other, self.__class__):
            # they're both matching
            cond0 = self.is_matching() and other.is_matching()
            # value and origins match
            cond1 = self.value == other.value and self.origin == other.origin
            # sep_dicts_id matches
            cond2 = self.sep_dicts_id == other.sep_dicts_id or (
                self.output_matching and other.output_matching
            )
            return cond0 and cond1 and cond2
        return False

    def mismatches(self, other: object):
        """When Subkey matching is required, determines if this Subkey and another are
        a bad match, meaning either:

        1. output_matching is True, origin is same, value is different
        2. output_matching is False, sep_dicts_id is same and not None, origin
           is same, value is different
        """
        if isinstance(other, self.__class__):
            # one of the two keys is output_matching
            cond0 = self.output_matching or other.output_matching
            # neither key is output_matching but sep_dict_ids not None and match
            cond1 = not cond0 and self.matches_sep_dict_id(other)
            # origins match and values mismatch
            cond2 = self.origin == other.origin and self.value != other.value
            return (cond0 or cond1) and cond2
        return True

    def __eq__(self, other: object):
        """Mainly used for testing purposes."""
        if isinstance(other, self.__class__):
            # value and origins match
            return self.value == other.value and self.origin == other.origin
        return False

    def __repr__(self):
        return str(self.value)

    def __hash__(self):
        """Mainly used for testing purposes."""
        return hash(self.value) ^ hash(self.origin) ^ hash(self.output_matching)

Methods

def is_matching(self)

Checks if subkey should be matched in other Vsets

Expand source code
def is_matching(self):
    """Checks if subkey should be matched in other Vsets"""
    return self.output_matching or self.sep_dicts_id is not None
def matches(self, other: object)

When Subkey matching is required, determines if this Subkey is compatible with another, meaning that the origins and values match, and either the _sep_dicts_id matches or both Subkeys have _output_matching True.

Expand source code
def matches(self, other: object):
    """When Subkey matching is required, determines if this Subkey is compatible
    with another, meaning that the origins and values match, and either the
    _sep_dicts_id matches or both Subkeys have _output_matching True.
    """
    if isinstance(other, self.__class__):
        # they're both matching
        cond0 = self.is_matching() and other.is_matching()
        # value and origins match
        cond1 = self.value == other.value and self.origin == other.origin
        # sep_dicts_id matches
        cond2 = self.sep_dicts_id == other.sep_dicts_id or (
            self.output_matching and other.output_matching
        )
        return cond0 and cond1 and cond2
    return False
def matches_sep_dict_id(self, other: object)

Helper to match Subkey by _sep_dict_id

Expand source code
def matches_sep_dict_id(self, other: object):
    """Helper to match Subkey by _sep_dict_id"""
    if isinstance(other, self.__class__):
        return (
            self.sep_dicts_id is not None
            and self.sep_dicts_id == other.sep_dicts_id
        )
    return False
def mismatches(self, other: object)

When Subkey matching is required, determines if this Subkey and another are a bad match, meaning either:

  1. output_matching is True, origin is same, value is different
  2. output_matching is False, sep_dicts_id is same and not None, origin is same, value is different
Expand source code
def mismatches(self, other: object):
    """When Subkey matching is required, determines if this Subkey and another are
    a bad match, meaning either:

    1. output_matching is True, origin is same, value is different
    2. output_matching is False, sep_dicts_id is same and not None, origin
       is same, value is different
    """
    if isinstance(other, self.__class__):
        # one of the two keys is output_matching
        cond0 = self.output_matching or other.output_matching
        # neither key is output_matching but sep_dict_ids not None and match
        cond1 = not cond0 and self.matches_sep_dict_id(other)
        # origins match and values mismatch
        cond2 = self.origin == other.origin and self.value != other.value
        return (cond0 or cond1) and cond2
    return True
class Vfunc (name: str = '', vfunc=<function Vfunc.<lambda>>)

Vfunc is basically a function along with a name attribute. It may support a "fit" function, but may also just have a "transform" function. If none of these is supported, it need only be a function

Expand source code
class Vfunc:
    """Vfunc is basically a function along with a name attribute.
    It may support a "fit" function, but may also just have a "transform" function.
    If none of these is supported, it need only be a function
    """

    def __init__(self, name: str = "", vfunc=lambda x: x):
        assert hasattr(vfunc, "fit") or callable(
            vfunc
        ), "vfunc must be an object with a fit method or a callable"
        self.name = name
        self.vfunc = vfunc

    def fit(self, *args, **kwargs):
        """This function fits params for this vfunc"""
        if hasattr(self.vfunc, "fit"):
            return self.vfunc.fit(*args, **kwargs)
        return self.vfunc(*args, **kwargs)

    def transform(self, *args, **kwargs):
        """This function transforms its input in some way"""
        if hasattr(self.vfunc, "transform"):
            return self.vfunc.transform(*args, **kwargs)
        return self.vfunc(*args, **kwargs)

    def __call__(self, *args, **kwargs):
        """This should decide what to call"""
        return self.fit(*args, **kwargs)

Methods

def fit(self, *args, **kwargs)

This function fits params for this vfunc

Expand source code
def fit(self, *args, **kwargs):
    """This function fits params for this vfunc"""
    if hasattr(self.vfunc, "fit"):
        return self.vfunc.fit(*args, **kwargs)
    return self.vfunc(*args, **kwargs)
def transform(self, *args, **kwargs)

This function transforms its input in some way

Expand source code
def transform(self, *args, **kwargs):
    """This function transforms its input in some way"""
    if hasattr(self.vfunc, "transform"):
        return self.vfunc.transform(*args, **kwargs)
    return self.vfunc(*args, **kwargs)
class VfuncPromise (vfunc: , *args)

A Vfunc promise used for lazy evaluation.

Expand source code
class VfuncPromise:
    """A Vfunc promise used for lazy evaluation."""

    def __init__(self, vfunc: callable, *args):
        self.vfunc = vfunc
        self.args = args
        self.called = False
        self.value = None

    def __call__(self):
        """This should decide what to call"""
        if self.called:
            return self.value
        tmp_args = []
        for i, arg in enumerate(self.args):
            tmp_args.append(arg)
            while isinstance(tmp_args[i], VfuncPromise):
                tmp_args[i] = tmp_args[i]()
        while isinstance(self.vfunc, VfuncPromise):
            self.vfunc = self.vfunc()
        self.value = self.vfunc(*tmp_args)
        self.called = True
        return self.value

    def _get_value(self):
        if isinstance(self(), ray.ObjectRef):
            self.value = ray.get(self.value)
        return self.value

    def transform(self, *args):
        """This function transforms its input in some way"""
        return self._get_value().transform(*args)

    def predict(self, *args):
        """This function calls predict on its inputs"""
        return self._get_value().predict(*args)

    def predict_proba(self, *args):
        """This function calls predict_proba on its inputs"""
        return self._get_value().predict_proba(*args)

    def __repr__(self):
        if self.called:
            return f"Fulfilled VfuncPromise({self.value})"
        return f"Unfulfilled VfuncPromise(func={self.vfunc}, args={self.args})"

Methods

def predict(self, *args)

This function calls predict on its inputs

Expand source code
def predict(self, *args):
    """This function calls predict on its inputs"""
    return self._get_value().predict(*args)
def predict_proba(self, *args)

This function calls predict_proba on its inputs

Expand source code
def predict_proba(self, *args):
    """This function calls predict_proba on its inputs"""
    return self._get_value().predict_proba(*args)
def transform(self, *args)

This function transforms its input in some way

Expand source code
def transform(self, *args):
    """This function transforms its input in some way"""
    return self._get_value().transform(*args)
class Vset (name: str, vfuncs, vfunc_keys: list = None, is_async: bool = False, output_matching: bool = False, lazy: bool = False, cache_dir: str = None, tracking_dir: str = None)

Parameters

name : str
Name of this Vset.
vfuncs : list or dict
Dictionary of functions that we want to associate with
vfunc_keys : list (optional)
List of names corresponding to each vfunc
is_async : bool (optional)
If True, vfuncs are computed asynchronously
output_matching : bool (optional)
If True, then output keys from this Vset will be matched when used in other Vsets
lazy : bool (optional)
If True, then vfuncs are evaluated lazily, i.e. outputs are vset.vfunc.VfuncPromise
cache_dir : str (optional)
If provided, do caching and use cache_dir as the data store for joblib.Memory.
tracking_dir : str (optional)
If provided, use the mlflow.tracking api to log outputs as metrics with params determined by input keys.
Expand source code
class Vset:
    def __init__(
        self,
        name: str,
        vfuncs,
        vfunc_keys: list = None,
        is_async: bool = False,
        output_matching: bool = False,
        lazy: bool = False,
        cache_dir: str = None,
        tracking_dir: str = None,
    ):
        """
        Parameters
        ----------
        name: str
            Name of this Vset.
        vfuncs: list or dict
            Dictionary of functions that we want to associate with
        vfunc_keys: list (optional)
            List of names corresponding to each vfunc
        is_async: bool (optional)
            If True, `vfuncs` are computed asynchronously
        output_matching: bool (optional)
            If True, then output keys from this Vset will be matched when used
            in other Vsets
        lazy: bool (optional)
            If True, then vfuncs are evaluated lazily, i.e. outputs are `vset.vfunc.VfuncPromise`
        cache_dir: str (optional)
            If provided, do caching and use `cache_dir` as the data store for
            `joblib.Memory`.
        tracking_dir: str (optional)
            If provided, use the `mlflow.tracking` api to log outputs as metrics
            with params determined by input keys.

        """
        self.name = name
        self._fitted = False
        self.fitted_vfuncs = None  # outputs
        self._async = is_async
        self._output_matching = output_matching
        self._lazy = lazy
        self._cache_dir = cache_dir
        self._memory = joblib.Memory(self._cache_dir)
        if tracking_dir is not None:
            self._mlflow = MlflowClient(tracking_uri=tracking_dir)
            experiment = self._mlflow.get_experiment_by_name(name=self.name)
            if experiment is None:
                self._exp_id = self._mlflow.create_experiment(name=self.name)
            else:
                self._exp_id = experiment.experiment_id
        else:
            self._mlflow = None
        # check if any of the vfuncs are AsyncVfuncs
        # if so, we'll make then all AsyncVfuncs later on
        if not self._async and np.any([isinstance(vf, AsyncVfunc) for vf in vfuncs]):
            self._async = True
        if isinstance(vfuncs, dict):
            self.vfuncs = vfuncs
        elif isinstance(vfuncs, list):
            if vfunc_keys is not None:
                assert isinstance(
                    vfunc_keys, list
                ), "vfuncs passed as list but vfunc_keys is not a list"
                assert len(vfuncs) == len(
                    vfunc_keys
                ), "vfuncs list and vfunc_keys list do not have the same length"
                # TODO: how best to handle tuple subkeys?
                vfunc_keys = [(self.__create_subkey(k),) for k in vfunc_keys]
            else:
                vfunc_keys = [
                    (self.__create_subkey(f"{name}_{i}"),) for i in range(len(vfuncs))
                ]
            # convert vfunc keys to singleton tuples
            self.vfuncs = dict(zip(vfunc_keys, vfuncs))
        # if needed, wrap the vfuncs in the Vfunc or AsyncVfunc class
        for k, v in self.vfuncs.items():
            if self._async:
                if not isinstance(v, AsyncVfunc):
                    self.vfuncs[k] = AsyncVfunc(k[0], v)
            elif not isinstance(v, Vfunc):
                self.vfuncs[k] = Vfunc(k[0], v)

    def _apply_func(self, *args, out_dict: dict = None):
        """Apply functions in out_dict to combined args dict

        Optionally logs output Subkeys and values as params and metrics using
        `mlflow.tracking` if this Vset has a `_tracking_dir`.

        Parameters
        ----------
        *args: dict
            Takes multiple dicts and combines them into one.
            Then runs vfuncs on each item in combined dict.
        out_dict: dict (optional), default None
            The dictionary to pass to the matching function. If None, defaults to self.vfuncs.

        Returns
        -------
        out_dict: dict
            Dictionary with items being determined by functions in vfunc set.
            Functions and input dictionaries are currently matched using a cartesian matching format.

        Examples
        --------
        >>> vfuncs, data = {LR : logistic}, {train_1 : [X1,y1], train2 : [X2,y2]}
        {(train_1, LR) : fitted logistic, (train_2, LR) :  fitted logistic}
        """
        if out_dict is None:
            out_dict = deepcopy(self.vfuncs)

        apply_func_cached = self._memory.cache(_apply_func_cached)
        out_dict = apply_func_cached(out_dict, self._async, self._lazy, *args)

        prev = tuple()
        for arg in args:
            if PREV_KEY in arg:
                prev += (arg[PREV_KEY],)
        out_dict[PREV_KEY] = (self,) + prev

        if self._mlflow is not None:
            run_dict = {}
            # log subkeys as params and value as metric
            for k, v in out_dict.items():
                if k == PREV_KEY:
                    continue
                origins = np.array([subk.origin for subk in k])
                # ignore init origins and the last origin (this Vset)
                param_idx = [i for i in range(len(k[:-1])) if origins[i] != "init"]
                # get or create mlflow run
                run_dict_key = tuple(subk.value for subk in k[:-1])
                if run_dict_key in run_dict:
                    run_id = run_dict[run_dict_key]
                else:
                    run = self._mlflow.create_run(self._exp_id)
                    run_id = run.info.run_id
                    run_dict[run_dict_key] = run_id
                    # log params
                    for idx in param_idx:
                        subkey = k[idx]
                        param_name = subkey.origin
                        # check if the origin occurs multiple times
                        if np.sum(origins == param_name) > 1:
                            occurence = np.sum(origins[:idx] == param_name)
                            param_name = param_name + str(occurence)
                            self._mlflow.log_param(run_id, param_name, subkey.value)
                self._mlflow.log_metric(run_id, k[-1].value, v)
        return out_dict

    def fit(self, *args):
        """Fits to args using `_apply_func`"""
        out_dict = {}
        for k, v in self.vfuncs.items():
            out_dict[k] = v.fit
        self.fitted_vfuncs = self._apply_func(*args, out_dict=out_dict)
        prev = self.fitted_vfuncs[PREV_KEY][1:]
        if hasattr(self, FILTER_PREV_KEY):
            prev = getattr(self, FILTER_PREV_KEY) + prev
        setattr(self, PREV_KEY, prev)
        self._fitted = True
        return self

    def fit_transform(self, *args):
        """Fits to args and transforms only the first arg."""
        return self.fit(*args).transform(args[0])

    def transform(self, *args):
        """Transforms args using `_apply_func`"""
        if not self._fitted:
            raise AttributeError(
                "Please fit the Vset object before calling the transform method."
            )
        out_dict = {}
        for k, v in self.fitted_vfuncs.items():
            if hasattr(v, "transform"):
                out_dict[k] = v.transform
        return self._apply_func(*args, out_dict=out_dict)

    def predict(self, *args, with_uncertainty: bool = False, group_by: list = None):
        """Predicts args using `_apply_func`"""
        if not self._fitted:
            raise AttributeError("Please fit the Vset object before calling predict.")
        pred_dict = {}
        for k, v in self.fitted_vfuncs.items():
            if hasattr(v, "predict"):
                pred_dict[k] = v.predict
        preds = self._apply_func(*args, out_dict=pred_dict)
        if with_uncertainty:
            return prediction_uncertainty(preds, group_by)
        return preds

    def predict_proba(
        self, *args, with_uncertainty: bool = False, group_by: list = None
    ):
        """Calls predict_proba on args using `_apply_func`"""
        if not self._fitted:
            raise AttributeError(
                "Please fit the Vset object before calling predict_proba."
            )
        pred_dict = {}
        for k, v in self.fitted_vfuncs.items():
            if hasattr(v, "predict_proba"):
                pred_dict[k] = v.predict_proba
        preds = self._apply_func(*args, out_dict=pred_dict)
        if with_uncertainty:
            return prediction_uncertainty(preds, group_by)
        return preds

    def evaluate(self, *args):
        """Combines dicts before calling `_apply_func`"""
        return self._apply_func(*args)

    def __call__(self, *args, n_out: int = None, keys=None, **kwargs):
        """Call args using `_apply_func`, optionally seperating
        output dictionary into `n_out` dictionaries with `keys`
        """
        if keys is None:
            keys = []
        if n_out is None:
            n_out = len(args)
        out_dict = self._apply_func(*args)
        if n_out == 1:
            return out_dict
        out_dicts = sep_dicts(out_dict, n_out=n_out, keys=keys)
        # add back prev
        prev = out_dict[PREV_KEY]
        for i in range(n_out):
            if n_out == len(args):
                out_dicts[i][PREV_KEY] = (prev[0],) + (prev[i + 1],)
            else:
                out_dicts[i][PREV_KEY] = prev
        return out_dicts

    def __getitem__(self, i):
        """Accesses ith item in the vfunc set"""
        return self.vfuncs[i]

    def __contains__(self, key):
        """Returns true if vfuncs is a dict and key is one of its keys"""
        if isinstance(self.vfuncs, dict):
            return key in self.vfuncs.keys()
        return False

    def keys(self):
        """Returns Vset vfunc keys"""
        if isinstance(self.vfuncs, dict):
            return self.vfuncs.keys()
        return {}.keys()

    def __len__(self):
        return len(self.vfuncs)

    def __str__(self):
        return "Vset(" + self.name + ")"

    def __create_subkey(self, value):
        """Helper function to construct `Subkey` with
        this Vset determining origin and output_matching
        """
        return Subkey(value, self.name, self._output_matching)

Methods

def evaluate(self, *args)

Combines dicts before calling _apply_func

Expand source code
def evaluate(self, *args):
    """Combines dicts before calling `_apply_func`"""
    return self._apply_func(*args)
def fit(self, *args)

Fits to args using _apply_func

Expand source code
def fit(self, *args):
    """Fits to args using `_apply_func`"""
    out_dict = {}
    for k, v in self.vfuncs.items():
        out_dict[k] = v.fit
    self.fitted_vfuncs = self._apply_func(*args, out_dict=out_dict)
    prev = self.fitted_vfuncs[PREV_KEY][1:]
    if hasattr(self, FILTER_PREV_KEY):
        prev = getattr(self, FILTER_PREV_KEY) + prev
    setattr(self, PREV_KEY, prev)
    self._fitted = True
    return self
def fit_transform(self, *args)

Fits to args and transforms only the first arg.

Expand source code
def fit_transform(self, *args):
    """Fits to args and transforms only the first arg."""
    return self.fit(*args).transform(args[0])
def keys(self)

Returns Vset vfunc keys

Expand source code
def keys(self):
    """Returns Vset vfunc keys"""
    if isinstance(self.vfuncs, dict):
        return self.vfuncs.keys()
    return {}.keys()
def predict(self, *args, with_uncertainty: bool = False, group_by: list = None)

Predicts args using _apply_func

Expand source code
def predict(self, *args, with_uncertainty: bool = False, group_by: list = None):
    """Predicts args using `_apply_func`"""
    if not self._fitted:
        raise AttributeError("Please fit the Vset object before calling predict.")
    pred_dict = {}
    for k, v in self.fitted_vfuncs.items():
        if hasattr(v, "predict"):
            pred_dict[k] = v.predict
    preds = self._apply_func(*args, out_dict=pred_dict)
    if with_uncertainty:
        return prediction_uncertainty(preds, group_by)
    return preds
def predict_proba(self, *args, with_uncertainty: bool = False, group_by: list = None)

Calls predict_proba on args using _apply_func

Expand source code
def predict_proba(
    self, *args, with_uncertainty: bool = False, group_by: list = None
):
    """Calls predict_proba on args using `_apply_func`"""
    if not self._fitted:
        raise AttributeError(
            "Please fit the Vset object before calling predict_proba."
        )
    pred_dict = {}
    for k, v in self.fitted_vfuncs.items():
        if hasattr(v, "predict_proba"):
            pred_dict[k] = v.predict_proba
    preds = self._apply_func(*args, out_dict=pred_dict)
    if with_uncertainty:
        return prediction_uncertainty(preds, group_by)
    return preds
def transform(self, *args)

Transforms args using _apply_func

Expand source code
def transform(self, *args):
    """Transforms args using `_apply_func`"""
    if not self._fitted:
        raise AttributeError(
            "Please fit the Vset object before calling the transform method."
        )
    out_dict = {}
    for k, v in self.fitted_vfuncs.items():
        if hasattr(v, "transform"):
            out_dict[k] = v.transform
    return self._apply_func(*args, out_dict=out_dict)