Module src.neural_networks

Expand source code
import torch
from torch import nn, optim
from torch.nn import functional as F
import numpy as np
from tqdm import tqdm
import torch.utils.data as data_utils
from features import downsample
import pickle as pkl
import models
   
class neural_net_sklearn():
    
    """
    sklearn wrapper for training a neural net
    """
    
    def __init__(self, D_in=40, H=40, p=17, epochs=1000, batch_size=100, track_name='X_same_length_normalized', arch='fcnn', torch_seed=2):
        
        """
        Parameters:
        ==========================================================
            D_in, H, p: int
                same as input to FCNN
                
            epochs: int
                number of epochs
                
            batch_size: int
                batch size
                
            track_name: str
                column name of track (the tracks should be of the same length)
        """
        
        torch.manual_seed(torch_seed)
        self.D_in = D_in
        self.H = H
        self.p = p
        self.epochs = epochs
        self.batch_size = batch_size
        self.track_name = track_name
        self.torch_seed = torch_seed
        self.arch = arch
        
        torch.manual_seed(self.torch_seed)
        if self.arch == 'fcnn':
            self.model = models.FCNN(self.D_in, self.H, self.p)
        elif 'lstm' in self.arch:
            self.model = models.LSTMNet(self.D_in, self.H, self.p)
        elif 'cnn' in self.arch:
            self.model = models.CNN(self.D_in, self.H, self.p)
        elif 'attention' in self.arch:
            self.model = models.AttentionNet(self.D_in, self.H, self.p)   
        elif 'video' in self.arch:
            self.model = models.VideoNet()


    def fit(self, X, y, verbose=False, checkpoint_fname=None, device='cpu'):
        
        """
        Train model
        
        Parameters:
        ==========================================================
            X: pd.DataFrame
                input data, should contain tracks and additional covariates
                
            y: np.array
                input response
        """        
        
        torch.manual_seed(self.torch_seed)
        if self.arch == 'fcnn':
            self.model = models.FCNN(self.D_in, self.H, self.p)
        elif 'lstm' in self.arch:
            self.model = models.LSTMNet(self.D_in, self.H, self.p)
        elif 'cnn' in self.arch:
            self.model = models.CNN(self.D_in, self.H, self.p)
        elif 'attention' in self.arch:
            self.model = models.AttentionNet(self.D_in, self.H, self.p)   
        elif 'video' in self.arch:
            self.model = models.VideoNet()
        
        # convert input dataframe to tensors
        X_track = X[self.track_name] # track
        X_track = torch.tensor(np.array(list(X_track.values)), dtype=torch.float)
        
        if len(X.columns) > 1: # covariates
            X_covariates = X[[c for c in X.columns if c != self.track_name]]
            X_covariates = torch.tensor(np.array(X_covariates).astype(float), dtype=torch.float)
        else:
            X_covariates = None
            
        # response
        y = torch.tensor(y.reshape(-1, 1), dtype=torch.float)
        
        # initialize optimizer
        optimizer = optim.Adam(self.model.parameters(), lr=0.001)
        
        # initialize dataloader
        if X_covariates is not None:
            dataset = torch.utils.data.TensorDataset(X_track, X_covariates, y)
        else:
            dataset = torch.utils.data.TensorDataset(X_track, y)
        train_loader = torch.utils.data.DataLoader(dataset, 
                                                   batch_size=self.batch_size,
                                                   shuffle=True) 
        #train_loader = [(X1, X2, y)]
        
        # train fcnn
        print('fitting dnn...')
        self.model = self.model.to(device)
        for epoch in tqdm(range(self.epochs)):
            train_loss = 0
            for batch_idx, data in enumerate(train_loader):
                optimizer.zero_grad()
                # print('shapes input', data[0].shape, data[1].shape)
                if X_covariates is not None:
                    preds = self.model(data[0].to(device), data[1].to(device))
                    y = data[2].to(device)
                else:
                    preds = self.model(data[0].to(device))
                    y = data[1].to(device)
                loss_fn = torch.nn.MSELoss()
                loss = loss_fn(preds, y)
                loss.backward()
                train_loss += loss.item()
                optimizer.step()
            if verbose:
                print(f'Epoch: {epoch}, Average loss: {train_loss/len(X_track):.4e}')
            elif epoch % (self.epochs // 10) == 99:
                print(f'Epoch: {epoch}, Average loss: {train_loss/len(X_track):.4e}')
            if checkpoint_fname is not None:
                pkl.dump({'model_state_dict': self.model.state_dict()},
                         open(checkpoint_fname, 'wb'))
            
    def predict(self, X_new):
        
        """
        make predictions with new data
        
        Parameters:
        ==========================================================
            X_new: pd.DataFrame
                input new data, should contain tracks and additional covariates
        """ 
        self.model.eval()
        with torch.no_grad():        

            # convert input dataframe to tensors
            X_new_track = X_new[self.track_name]
            X_new_track = torch.tensor(np.array(list(X_new_track.values)), dtype=torch.float)
            
            if len(X_new.columns) > 1:
                X_new_covariates = X_new[[c for c in X_new.columns if c != self.track_name]]
                X_new_covariates = torch.tensor(np.array(X_new_covariates).astype(float), dtype=torch.float)      
                preds = self.model(X_new_track, X_new_covariates)
            else:
                preds = self.model(X_new_track)
        return preds.data.numpy().reshape(1, -1)[0]
        
        

Classes

class neural_net_sklearn (D_in=40, H=40, p=17, epochs=1000, batch_size=100, track_name='X_same_length_normalized', arch='fcnn', torch_seed=2)

sklearn wrapper for training a neural net

Parameters:

D_in, H, p: int
    same as input to FCNN

epochs: int
    number of epochs

batch_size: int
    batch size

track_name: str
    column name of track (the tracks should be of the same length)
Expand source code
class neural_net_sklearn():
    
    """
    sklearn wrapper for training a neural net
    """
    
    def __init__(self, D_in=40, H=40, p=17, epochs=1000, batch_size=100, track_name='X_same_length_normalized', arch='fcnn', torch_seed=2):
        
        """
        Parameters:
        ==========================================================
            D_in, H, p: int
                same as input to FCNN
                
            epochs: int
                number of epochs
                
            batch_size: int
                batch size
                
            track_name: str
                column name of track (the tracks should be of the same length)
        """
        
        torch.manual_seed(torch_seed)
        self.D_in = D_in
        self.H = H
        self.p = p
        self.epochs = epochs
        self.batch_size = batch_size
        self.track_name = track_name
        self.torch_seed = torch_seed
        self.arch = arch
        
        torch.manual_seed(self.torch_seed)
        if self.arch == 'fcnn':
            self.model = models.FCNN(self.D_in, self.H, self.p)
        elif 'lstm' in self.arch:
            self.model = models.LSTMNet(self.D_in, self.H, self.p)
        elif 'cnn' in self.arch:
            self.model = models.CNN(self.D_in, self.H, self.p)
        elif 'attention' in self.arch:
            self.model = models.AttentionNet(self.D_in, self.H, self.p)   
        elif 'video' in self.arch:
            self.model = models.VideoNet()


    def fit(self, X, y, verbose=False, checkpoint_fname=None, device='cpu'):
        
        """
        Train model
        
        Parameters:
        ==========================================================
            X: pd.DataFrame
                input data, should contain tracks and additional covariates
                
            y: np.array
                input response
        """        
        
        torch.manual_seed(self.torch_seed)
        if self.arch == 'fcnn':
            self.model = models.FCNN(self.D_in, self.H, self.p)
        elif 'lstm' in self.arch:
            self.model = models.LSTMNet(self.D_in, self.H, self.p)
        elif 'cnn' in self.arch:
            self.model = models.CNN(self.D_in, self.H, self.p)
        elif 'attention' in self.arch:
            self.model = models.AttentionNet(self.D_in, self.H, self.p)   
        elif 'video' in self.arch:
            self.model = models.VideoNet()
        
        # convert input dataframe to tensors
        X_track = X[self.track_name] # track
        X_track = torch.tensor(np.array(list(X_track.values)), dtype=torch.float)
        
        if len(X.columns) > 1: # covariates
            X_covariates = X[[c for c in X.columns if c != self.track_name]]
            X_covariates = torch.tensor(np.array(X_covariates).astype(float), dtype=torch.float)
        else:
            X_covariates = None
            
        # response
        y = torch.tensor(y.reshape(-1, 1), dtype=torch.float)
        
        # initialize optimizer
        optimizer = optim.Adam(self.model.parameters(), lr=0.001)
        
        # initialize dataloader
        if X_covariates is not None:
            dataset = torch.utils.data.TensorDataset(X_track, X_covariates, y)
        else:
            dataset = torch.utils.data.TensorDataset(X_track, y)
        train_loader = torch.utils.data.DataLoader(dataset, 
                                                   batch_size=self.batch_size,
                                                   shuffle=True) 
        #train_loader = [(X1, X2, y)]
        
        # train fcnn
        print('fitting dnn...')
        self.model = self.model.to(device)
        for epoch in tqdm(range(self.epochs)):
            train_loss = 0
            for batch_idx, data in enumerate(train_loader):
                optimizer.zero_grad()
                # print('shapes input', data[0].shape, data[1].shape)
                if X_covariates is not None:
                    preds = self.model(data[0].to(device), data[1].to(device))
                    y = data[2].to(device)
                else:
                    preds = self.model(data[0].to(device))
                    y = data[1].to(device)
                loss_fn = torch.nn.MSELoss()
                loss = loss_fn(preds, y)
                loss.backward()
                train_loss += loss.item()
                optimizer.step()
            if verbose:
                print(f'Epoch: {epoch}, Average loss: {train_loss/len(X_track):.4e}')
            elif epoch % (self.epochs // 10) == 99:
                print(f'Epoch: {epoch}, Average loss: {train_loss/len(X_track):.4e}')
            if checkpoint_fname is not None:
                pkl.dump({'model_state_dict': self.model.state_dict()},
                         open(checkpoint_fname, 'wb'))
            
    def predict(self, X_new):
        
        """
        make predictions with new data
        
        Parameters:
        ==========================================================
            X_new: pd.DataFrame
                input new data, should contain tracks and additional covariates
        """ 
        self.model.eval()
        with torch.no_grad():        

            # convert input dataframe to tensors
            X_new_track = X_new[self.track_name]
            X_new_track = torch.tensor(np.array(list(X_new_track.values)), dtype=torch.float)
            
            if len(X_new.columns) > 1:
                X_new_covariates = X_new[[c for c in X_new.columns if c != self.track_name]]
                X_new_covariates = torch.tensor(np.array(X_new_covariates).astype(float), dtype=torch.float)      
                preds = self.model(X_new_track, X_new_covariates)
            else:
                preds = self.model(X_new_track)
        return preds.data.numpy().reshape(1, -1)[0]

Methods

def fit(self, X, y, verbose=False, checkpoint_fname=None, device='cpu')

Train model

Parameters:

X: pd.DataFrame
    input data, should contain tracks and additional covariates

y: np.array
    input response
Expand source code
def fit(self, X, y, verbose=False, checkpoint_fname=None, device='cpu'):
    
    """
    Train model
    
    Parameters:
    ==========================================================
        X: pd.DataFrame
            input data, should contain tracks and additional covariates
            
        y: np.array
            input response
    """        
    
    torch.manual_seed(self.torch_seed)
    if self.arch == 'fcnn':
        self.model = models.FCNN(self.D_in, self.H, self.p)
    elif 'lstm' in self.arch:
        self.model = models.LSTMNet(self.D_in, self.H, self.p)
    elif 'cnn' in self.arch:
        self.model = models.CNN(self.D_in, self.H, self.p)
    elif 'attention' in self.arch:
        self.model = models.AttentionNet(self.D_in, self.H, self.p)   
    elif 'video' in self.arch:
        self.model = models.VideoNet()
    
    # convert input dataframe to tensors
    X_track = X[self.track_name] # track
    X_track = torch.tensor(np.array(list(X_track.values)), dtype=torch.float)
    
    if len(X.columns) > 1: # covariates
        X_covariates = X[[c for c in X.columns if c != self.track_name]]
        X_covariates = torch.tensor(np.array(X_covariates).astype(float), dtype=torch.float)
    else:
        X_covariates = None
        
    # response
    y = torch.tensor(y.reshape(-1, 1), dtype=torch.float)
    
    # initialize optimizer
    optimizer = optim.Adam(self.model.parameters(), lr=0.001)
    
    # initialize dataloader
    if X_covariates is not None:
        dataset = torch.utils.data.TensorDataset(X_track, X_covariates, y)
    else:
        dataset = torch.utils.data.TensorDataset(X_track, y)
    train_loader = torch.utils.data.DataLoader(dataset, 
                                               batch_size=self.batch_size,
                                               shuffle=True) 
    #train_loader = [(X1, X2, y)]
    
    # train fcnn
    print('fitting dnn...')
    self.model = self.model.to(device)
    for epoch in tqdm(range(self.epochs)):
        train_loss = 0
        for batch_idx, data in enumerate(train_loader):
            optimizer.zero_grad()
            # print('shapes input', data[0].shape, data[1].shape)
            if X_covariates is not None:
                preds = self.model(data[0].to(device), data[1].to(device))
                y = data[2].to(device)
            else:
                preds = self.model(data[0].to(device))
                y = data[1].to(device)
            loss_fn = torch.nn.MSELoss()
            loss = loss_fn(preds, y)
            loss.backward()
            train_loss += loss.item()
            optimizer.step()
        if verbose:
            print(f'Epoch: {epoch}, Average loss: {train_loss/len(X_track):.4e}')
        elif epoch % (self.epochs // 10) == 99:
            print(f'Epoch: {epoch}, Average loss: {train_loss/len(X_track):.4e}')
        if checkpoint_fname is not None:
            pkl.dump({'model_state_dict': self.model.state_dict()},
                     open(checkpoint_fname, 'wb'))
def predict(self, X_new)

make predictions with new data

Parameters:

X_new: pd.DataFrame
    input new data, should contain tracks and additional covariates
Expand source code
def predict(self, X_new):
    
    """
    make predictions with new data
    
    Parameters:
    ==========================================================
        X_new: pd.DataFrame
            input new data, should contain tracks and additional covariates
    """ 
    self.model.eval()
    with torch.no_grad():        

        # convert input dataframe to tensors
        X_new_track = X_new[self.track_name]
        X_new_track = torch.tensor(np.array(list(X_new_track.values)), dtype=torch.float)
        
        if len(X_new.columns) > 1:
            X_new_covariates = X_new[[c for c in X_new.columns if c != self.track_name]]
            X_new_covariates = torch.tensor(np.array(X_new_covariates).astype(float), dtype=torch.float)      
            preds = self.model(X_new_track, X_new_covariates)
        else:
            preds = self.model(X_new_track)
    return preds.data.numpy().reshape(1, -1)[0]