Models

API details.
#default_exp models
#hide
from nbdev.showdoc import *
#export
import itertools
from torch.nn  import functional as F 
import torch.nn as nn
import torch
class ToyODE(nn.Module):
    """ 
    ODE derivative network
    
    feature_dims (int) default '5': dimension of the inputs, either in ambient space or embedded space.
    layer (list of int) defaulf ''[64]'': the hidden layers of the network.
    activation (torch.nn) default '"ReLU"': activation function applied in between layers.
    scales (NoneType|list of float) default 'None': the initial scale for the noise in the trajectories. One scale per bin, add more if using an adaptative ODE solver.
    n_aug (int) default '1': number of added dimensions to the input of the network. Total dimensions are features_dim + 1 (time) + n_aug. 
    
    Method
    forward (Callable)
        forward pass of the ODE derivative network.
        Parameters:
        t (torch.tensor): time of the evaluation.
        x (torch.tensor): position of the evalutation.
        Return:
        derivative at time t and position x.   
    """
    def __init__(
        self, 
        feature_dims=5,
        layers=[64],
        activation='ReLU',
        scales=None,
        n_aug=2
    ):
        super(ToyODE, self).__init__()
        steps = [feature_dims+1+n_aug, *layers, feature_dims]
        pairs = zip(steps, steps[1:])

        chain = list(itertools.chain(*list(zip(
            map(lambda e: nn.Linear(*e), pairs), 
            itertools.repeat(getattr(nn, activation)())
        ))))[:-1]

        self.chain = chain
        self.seq = (nn.Sequential(*chain))
        
        self.alpha = nn.Parameter(torch.tensor(scales, requires_grad=True).float()) if scales is not None else None
        self.n_aug = n_aug        
        
    def forward(self, t, x): #NOTE the forward pass when we use torchdiffeq must be forward(self,t,x)
        zero = torch.tensor([0]).cuda() if x.is_cuda else torch.tensor([0])
        zeros = zero.repeat(x.size()[0],self.n_aug)
        time = t.repeat(x.size()[0],1)
        aug = torch.cat((x,time,zeros),dim=1)
        x = self.seq(aug)
        if self.alpha is not None:
            z = torch.randn(x.size(),requires_grad=False).cuda() if x.is_cuda else torch.randn(x.size(),requires_grad=False)
        dxdt = x + z*self.alpha[int(t-1)] if self.alpha is not None else x
        return dxdt
#export
def make_model(
    feature_dims=5,
    layers=[64],
    output_dims=5,
    activation='ReLU',
    which='ode',
    method='rk4',
    rtol=None,
    atol=None,
    scales=None,
    n_aug=2,
    noise_type='diagonal', sde_type='ito',
    use_norm=False,
    use_cuda=False,
    in_features=2, out_features=2, gunc=None
):
    """
    Creates the 'ode' model or 'sde' model or the Geodesic Autoencoder. 
    See the parameters of the respective classes.
    """
    if which == 'ode':
        ode = ToyODE(feature_dims, layers, activation,scales,n_aug)
        model = ToyModel(ode,method,rtol, atol, use_norm=use_norm)
    elif which == 'sde':
        ode = ToyODE(feature_dims, layers, activation,scales,n_aug)
        model = ToySDEModel(
            ode, method, noise_type, sde_type,
            in_features=in_features, out_features=out_features, gunc=gunc
            
        )
    else:
        model = ToyGeo(feature_dims, layers, output_dims, activation)
    if use_cuda:
        model.cuda()
    return model
#export
import itertools
import torch.nn as nn
from torch.nn  import functional as F 

class Autoencoder(nn.Module):
    """ 
    Geodesic Autoencoder
    
    encoder_layers (list of int) default '[100, 100, 20]': encoder_layers[0] is the feature dimension, and encoder_layers[-1] the embedded dimension.
    decoder_layers (list of int) defaulf '[20, 100, 100]': decoder_layers[0] is the embbeded dim and decoder_layers[-1] the feature dim.
    activation (torch.nn) default '"Tanh"': activation function applied in between layers.
    use_cuda (bool) default to False: Whether to use GPU or CPU.
    
    Method
    encode
        forward pass of the encoder
        x (torch.tensor): observations
        Return:
        the encoded observations
    decode
        forward pass of the decoder
        z (torch.tensor): embedded observations
        Return:
        the decoded observations
    forward (Callable):
        full forward pass, encoder and decoder
        x (torch.tensor): observations
        Return:
        denoised observations
    """

    def __init__(
        self,
        encoder_layers = [100, 100, 20],
        decoder_layers = [20, 100, 100],
        activation = 'Tanh',
        use_cuda = False
    ):        
        super(Autoencoder, self).__init__()
        if decoder_layers is None:
            decoder_layers = [*encoder_layers[::-1]]
        device = 'cuda' if use_cuda else 'cpu'
        
        encoder_shapes = list(zip(encoder_layers, encoder_layers[1:]))
        decoder_shapes = list(zip(decoder_layers, decoder_layers[1:]))
        
        encoder_linear = list(map(lambda a: nn.Linear(*a), encoder_shapes))
        decoder_linear = list(map(lambda a: nn.Linear(*a), decoder_shapes))
        
        encoder_riffle = list(itertools.chain(*zip(encoder_linear, itertools.repeat(getattr(nn, activation)()))))[:-1]
        encoder = nn.Sequential(*encoder_riffle).to(device)
        
        decoder_riffle = list(itertools.chain(*zip(decoder_linear, itertools.repeat(getattr(nn, activation)()))))[:-1]

        decoder = nn.Sequential(*decoder_riffle).to(device)
        self.encoder = encoder
        self.decoder = decoder

        
    
    def encode(self, x):
        return self.encoder(x)

    def decode(self, z):
        return self.decoder(z)

    def forward(self, x):
        z = self.encoder(x)
        return self.decoder(z)
: 
#export

from torchdiffeq import odeint_adjoint as odeint
import os, math, numpy as np
import torch
import torch.nn as nn
class ToyModel(nn.Module):
    """ 
    Neural ODE
        func (nn.Module): The network modeling the derivative.
        method (str) defaulf '"rk4"': any methods from torchdiffeq.
        rtol (NoneType | float): the relative tolerance of the ODE solver.
        atol (NoneType | float): the absolute tolerance. of the ODE solver.
        use_norm (bool): if True keeps the norm of func.
        norm (list of torch.tensor): the norm of the derivative.
        
        Method
        forward (Callable)
            x (torch.tensor): the initial sample
            t (torch.tensor) time points where we suppose x is from t[0]
            return the last sample or the whole seq.      
    """
    
    def __init__(self, func, method='rk4', rtol=None, atol=None, use_norm=False):
        super(ToyModel, self).__init__()        
        self.func = func
        self.method = method
        self.rtol=rtol
        self.atol=atol
        self.use_norm = use_norm
        self.norm=[]

    def forward(self, x, t, return_whole_sequence=False):

        if self.use_norm:
            for time in t: 
                self.norm.append(torch.linalg.norm(self.func(time,x)).pow(2))
        if self.atol is None and self.rtol is None:
            x = odeint(self.func,x ,t, method=self.method)
        elif self.atol is not None and self.rtol is None:
            x = odeint(self.func,x ,t, method=self.method, atol=self.atol)
        elif self.atol is None and self.rtol is not None:
            x = odeint(self.func,x ,t, method=self.method, rtol=self.rtol)
        else: 
            x = odeint(self.func,x ,t, method=self.method, atol=self.atol, rtol=self.rtol)          
       
        x = x[-1] if not return_whole_sequence else x
        return x
#export

from torchdiffeq import odeint_adjoint as odeint
import os, math, numpy as np
import torch
import torch.nn as nn
import torchsde

class ToySDEModel(nn.Module):
    """ 
    Neural SDE model
        func (nn.Module): drift term.
        genc (nn.Module): diffusion term.
        method (str): method of the SDE solver.
        
        Method
        forward (Callable)
            x (torch.tensor): the initial sample
            t (torch.tensor) time points where we suppose x is from t[0]
            return the last sample or the whole seq.  
    """
    
    def __init__(self, func, method='euler', noise_type='diagonal', sde_type='ito', 
    in_features=2, out_features=2, gunc=None, dt=0.1):
        super(ToySDEModel, self).__init__()        
        self.func = func
        self.method = method
        self.noise_type = noise_type
        self.sde_type = sde_type
        if gunc is None:
            self._gunc_args = 'y'
            self.gunc = nn.Linear(in_features, out_features)
        else:
            self._gunc_args = 't,y'
            self.gunc = gunc

        self.dt = dt
        
    def f(self, t, y):
        return self.func(t, y)

    def g(self, t, y):
        return self.gunc(t, y) if self._gunc_args == 't,y' else self.gunc(y)
        return 0.3 * torch.sigmoid(torch.cos(t) * torch.exp(-y))

    def forward(self, x, t, return_whole_sequence=False, dt=None):
        dt = self.dt if self.dt is not None else 0.1 if dt is None else dt        
        x = torchsde.sdeint(self, x, t, method=self.method, dt=dt)
       
        x = x[-1] if not return_whole_sequence else x
        return x