Travaux pratiques : auto-encodeurs variationnels

L’objectif de cette séance de travaux pratiques est d’illustrer la construction d’un espace latent en utilisant un auto-encodeur classique, puis un auto-encodeur variationnel. (cf. Kingma et Welling ).

Nous utiliserons ensuite le VAE pour générer des données synthétiques mais plausibles vis à vis de la distribution des données d’apprentissage.

Le TP est dimensionné de sorte à pouvoir être terminé sans accélérateur graphique pour les calculs en utilisant la base de données Fashion-MNIST. Si vous disposez d’un GPU (par exemple, parce que vous travaillez sur Google Colab), vous pouvez changer la valeur de la variable use_gpu à True.

import torch
use_gpu = False
device = torch.device("cuda:0" if use_gpu and torch.cuda.is_available() else "cpu")
print(f"Exécution sur {device}")
# Imports des bibliothèques utiles
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

from matplotlib import pyplot as plt

Préambule

Pour commencer, nous allons charger en mémoire les données de Fashion-MNIST et en visualiser quelques unes. Ces images sont similaires en format aux données de MNIST : 28x28 pixels en niveaux de gris.

Ce jeu de données est préintégré dans la bibliothèque torchvision:

from torch.utils.data import DataLoader
from torchvision.datasets import FashionMNIST

from torchvision.transforms import ToTensor, ToPILImage

train_dataset = FashionMNIST(root='./data/FashionMNIST', download=True, train=True, transform=ToTensor())
test_dataset = FashionMNIST(root='./data/FashionMNIST', download=True, train=False, transform=ToTensor())

Nous pouvons visualiser quelques unes de ces images:

n_images = 5

fig = plt.figure()
for i, (image, label) in enumerate(train_dataset):
    fig.add_subplot(1, n_images+1, i+1)
    plt.imshow(ToPILImage()(image), cmap="gray")
    plt.axis("off")
    if i >= n_images:
        break
plt.show()

Auto-encodeur

Implémentation

Notre premier modèle sera un auto-encodeur convolutif doté de l’architecture ci-dessous.

Pour l’encodeur :

  • une couche de convolution (kernel_size=4, in_channels=1, out_channels=32, stride=2, padding=1, activation ReLU)

  • une couche de convolution (kernel_size=4, in_channels=32, out_channels=64, stride=2, padding=1, activation ReLU)

  • une couche linéaire (in_features=64*7*7, out_features=latent_dimension)

Pour le décodeur:

  • une couche linéaire (in_features=latent_dimension, out_features=64*7*7, activation ReLU)

  • une couche de convolution transposée (kernel size=4, in_channels=64, out_channels=32, stride=2, padding=1, activation ReLU)

  • une couche de convolution transposée (kernel size=4, in_channels=32, out_channels=1, stride=2, padding=1, activation sigmoide)

Note

Les filtres convolutifs sont choisis de taille 4x4 afin d’éviter des problèmes d’aliasing.

Question

Compléter l’implémentation ci-dessous de l’auto-encodeur dont l’architecture vient d’être décrite. Cette implémentation utilise l’interface torch.nn.Module dont la documentation peut vous être utile.

En plus de la reconstruction par l’auto-encodeur, on souhaite que la méthode forward() renvoie également le code intermédiaire z (un vecteur de longueur latent_dimension) obtenu après le passage dans le décodeur.

Indice: l’utilisation de la méthode .view() ou de la couche nn.Flatten() peut être utile pour ré-arranger les tenseurs avant ou après les couches linéaires. Par exemple, x.view(-1, 64, 7, 7) permet de transformer un tenseur de dimensions (batch, 3136) en un tenseur de dimensions (batch, 64, 7, 7)

Correction

class AutoEncoder(nn.Module):
    def __init__(self, latent_dimension):
        super(AutoEncoder, self).__init__()
        self.encoder = nn.Sequential(nn.Conv2d(1, 32, kernel_size=4, stride=2, padding=1),
                                     nn.ReLU(),
                                     nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=1),
                                     nn.ReLU(),
                                     nn.Flatten(),
                                     nn.Linear(in_features=64*7*7, out_features=latent_dimension)
                                     )
        self.decoder_linear = nn.Linear(in_features=latent_dimension, out_features=64*7*7)
        self.decoder = nn.Sequential(nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1),
                                     nn.ReLU(),
                                     nn.ConvTranspose2d(32, 1, kernel_size=4, stride=2, padding=1),
                                     nn.Sigmoid(),
                                    )

    def forward(self, x):
        ###### Votre code ici ########
        z = self.encoder(x)
        hat_x = F.relu(self.decoder_linear(z))
        hat_x = hat_x.view(-1, 64, 7, 7)
        hat_x = self.decoder(hat_x)
        return hat_x, z

Entraînement

Une fois le modèle implémenté, nous pouvons utiliser la fonction train ci-dessous pour réaliser l’apprentissage. L’optimisation se fait selon le critère choisi dans la variable criterion (par défaut, il s’agit de l’erreur quadratique moyenne comme critère de reconstruction).

from tqdm.notebook import trange, tqdm

def train(net, train_dataset, epochs=10, learning_rate=1e-3, batch_size=128, device=device):
    # Création du DataLoader pour charger les données
    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2)
    # Définition de l'algorithme d'optimisation (Adam, variante de la SGD)
    optimizer = torch.optim.Adam(params=net.parameters(), lr=learning_rate, weight_decay=1e-5)
    # Choix de la fonction de coût
    criterion = nn.MSELoss()
    # Passe le modèle en mode "apprentissage"
    net = net.to(device)
    net = net.train()

    train_loss_avg = []

    t = trange(1, epochs + 1, desc="Entraînement du modèle")
    for epoch in t:
        avg_loss = 0.
        # Parcours du dataset pour une epoch
        for images, _ in tqdm(train_dataloader):
            # les labels sont ignorés pour l'apprentissage de l'auto-encodeur

            images = images.to(device)
            # Calcul de la reconstruction
            reconstructions, _ = net(images)
            # Calcul del'erreur
            loss = criterion(reconstructions, images)

            # Rétropropagation du gradient
            optimizer.zero_grad()
            loss.backward()
            # Descente de gradient (une itération)
            optimizer.step()
            avg_loss += loss.item()

        avg_loss /= len(train_dataloader)
        t.set_description(f"Epoch {epoch}: loss = {avg_loss:.3f}")
    return net

Nous pouvons créer un modèle en spécifiant la dimension de son espace latent (par exemple, 10):

latent_dimension = 10
net = AutoEncoder(latent_dimension)

Puis démarrer son entraînement (sur CPU, cette opération peut prendre jusqu’à une dizaine de minutes) :

net = train(net, train_dataset)

Visualisation des reconstructions

Une fois l’apprentissage terminé, nous pouvons visualiser quelques reconstructions obtenues grâce à l’auto-encodeur. Cela permet de jauger qualitativement des performances du modèle en reconstruction.

Note

La compréhension fine des fonctions make_grid, show_grid et visualize_reconstructions n’est pas indispensable à la poursuite du TP.

from torchvision.utils import make_grid

net = net.eval()
test_dataloader = DataLoader(test_dataset, batch_size=128, shuffle=False)


def show_grid(grid):
    plt.imshow(np.transpose(grid.numpy(), (1, 2, 0)))
    plt.show()

def visualize_reconstructions(net, images, device=device):
    # Mode inférence
    with torch.no_grad():
        images = images.to(device)
        reconstructions = net(images)[0]
        image_grid = make_grid(reconstructions[1:50], 10, 5).cpu()
        return image_grid

images, _ = iter(test_dataloader).next()

# Images de test
plt.figure(figsize=(12, 6))
plt.title("Images du jeu de test")
show_grid(make_grid(images[1:50],10,5))

# Reconstruction et visualisation des images reconstruites
plt.figure(figsize=(12, 6))
plt.title("Reconstruction par l'auto-encodeur")
show_grid(visualize_reconstructions(net, images))

Débruitage

Une capacité intéressante des auto-encodeurs est leur capacité à apprendre des filtres robustes au bruit. En particulier, en bruitant légèrement une observation, on retrouve généralement la reconstruction moyenne non-bruitée. Cette propriété de débruitage est particulièrement intéressante pour l’amélioration de la qualité des signaux (images, sons, etc.).

Observons la capacité de débruitage de notre auto-encodeur sur un échantillon d’images de test.

Question

Compléter le code ci-dessous pour ajouter un bruit blanc uniforme aux images de test contenues dans le tenseur images. Pensez à ajuster l’amplitude du bruit et à limiter les valeurs des pixels de sortie à la plage autorisée \([0,1]\) (la fonction clamp peut vous aider).

Correction

# Bruit blanc (uniforme) centré en 0
noise = torch.rand_like(images) - 0.5
# Ajout du bruit + troncature des valeurs en dehors de [0,1]
noisy_images = torch.clamp(images + 0.5 * noise, 0, 1)

Nous pouvons alors visualiser la reconstruction des images bruitées.

# Images de test
plt.figure(figsize=(12, 6))
plt.title("Images du jeu de test bruitées")
show_grid(make_grid(noisy_images[1:50],10,5))

# Reconstruction et visualisation des images reconstruites
plt.figure(figsize=(12, 6))
plt.title("Reconstruction par l'auto-encodeur")
show_grid(visualize_reconstructions(net, noisy_images))

Question

Comparer les reconstructions des images bruitées aux reconstructions obtenues sur les images de test originales. Que constatez-vous ?

Correction

Pour des bruits de faible amplitude, les images reconstruites sont moins bruitées. Autrement dit, l’auto-encodeur « débruite » en partie les images. En revanche, lorsque l’amplitude du bruit est forte, le signal utile de l’image disparaît et la sortie devient décorrélée de l’entrée.

Auto-encodeurs variationnels

Implémentation

Nous allons à présent implémenter un VAE convolutif qui hérite de la même structure que l’auto-encodeur que nous avons précédemment défini. Pour nous simplifier les choses par la suite, nous allons commencer par séparer le sous-réseau qui définit l’encodeur de celui qui définit le décodeur.

Question: en reprenant ce qui a été fait plus haut pour l’auto-encodeur classique, compléter les implémentations ci-dessous de l’encodeur et du décodeur pour le VAE. On rappelle que, contrairement à l’auto-encodeur, la sortie de l’encodeur est double :

  • le vecteur mu qui contient la moyenne de la gaussienne dans l’espace latent,

  • le vecteur sigma qui contient les variances selon les différentes directions de la gaussienne dans l’espace latent.

Ces valeurs seront les paramètres de la gaussienne associée à une observation \(x\). Ces deux vecteurs ont pour dimension la dimension de l’espace latent.

Correction

class Encoder(nn.Module):
    def __init__(self, latent_dimension):
        super(Encoder, self).__init__()
        self.model = nn.Sequential(nn.Conv2d(1, 32, kernel_size=4, stride=2, padding=1),
                                     nn.ReLU(),
                                     nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=1),
                                     nn.ReLU(),
                                     nn.Flatten(),
                                   )
        self.linear1 = nn.Linear(in_features=64*7*7, out_features=latent_dimension)
        self.linear2 = nn.Linear(in_features=64*7*7, out_features=latent_dimension)

    def forward(self, x):
        x = self.model(x)
        x_mu = self.linear1(x)
        x_logvar = self.linear2(x)
        return x_mu, x_logvar

class Decoder(nn.Module):
    def __init__(self, latent_dimension):
        super(Decoder, self).__init__()
        self.linear = nn.Linear(in_features=latent_dimension, out_features=64*7*7)
        self.model = nn.Sequential(nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1),
                                     nn.ReLU(),
                                     nn.ConvTranspose2d(32, 1, kernel_size=4, stride=2, padding=1),
                                     nn.Sigmoid(),
                                    )

    def forward(self, z):
        hat_x = F.relu(self.linear(z))
        hat_x = hat_x.view(-1, 64, 7, 7)
        hat_x = self.model(hat_x)
        return hat_x

Nous allons à présent combiner l’encodeur et le décodeur pour former l’auto-encodeur variationnel complet. Il y a néanmoins une petite subtilité car nous devons implémenter l’astuce de reparamétrisation. Celle-ci est implémenter dans la méthode latent_sample

Lors d’un passage avant (forward), le schéma suivant doit se dérouler :

  1. L’encodeur prend x en entrée et produit la moyenne mu et la variance logvar de la distribution. En pratique, on verra aussi x_recon la reconstruction de x.

  2. On tire un échantillon aléatoire z dans l’espace latent à l’aide de la méthode latent_sample. L’échantillonnage est fait selon la distribution gaussienne latente associée à \(x\) grâce à la reparamétrisation. Lors de l’inférence, on ne réalisera pas d’échantillonnage mais on se contentera d’utiliser la moyenne de la gaussienne.

  3. L’échantillon aléatoire z est passé dans le décodeur de sorte à obtenir la reconstruction x_recon.

Question

Compléter l’implémentation ci-dessous de l’auto-encodeur variationnel.

Correction

class VariationalAutoencoder(nn.Module):
    def __init__(self, latent_dim):
        super(VariationalAutoencoder, self).__init__()
        self.encoder = Encoder(latent_dim)
        self.decoder = Decoder(latent_dim)

    def forward(self, x):
        latent_mu, latent_logvar = self.encoder(x)
        z = self.latent_sample(latent_mu, latent_logvar)
        hat_x = self.decoder(z)
        return hat_x, latent_mu, latent_logvar

    def latent_sample(self, mu, logvar):
        if self.training:
            # the reparameterization trick
            std = logvar.mul(0.5).exp_()
            eps = torch.empty_like(std).normal_()
            return eps.mul(std).add_(mu)
        else:
            return mu

Enfin, il reste à définir la fonction de coût du VAE. D’après le cours, on cherche à maximiser l’ELBO \(\mathcal L\). Ici, on choisira de minimiser \(-\mathcal L\) avec

\[\mathcal{L}(\theta,\phi ; \boldsymbol x) = \underbrace{\mathbb{E}_{q_\phi(\boldsymbol z | \boldsymbol x)} \left [ \log p_\theta(\boldsymbol x | \boldsymbol z) \right ]}_{\text{Espérance de la vraisemblance}} - \underbrace{KL\, \left (q_\phi(\boldsymbol z | \boldsymbol x) \, || \, p_\theta(\boldsymbol z)\right)}_{\text{écart au prior}}\]

La fonction de coût pour une reconstruction sur une seule donnée \(\boldsymbol x^{(i)}\) est approximée par:

\[-\mathcal{L}(\theta,\phi ; \boldsymbol x^{(i)}) \simeq - \frac{1}{2} \sum_j^d \bigl ( 1 + \log((\sigma_j^{(i)})^2) - (\mu_j^{(i)})^2 - (\sigma_j^{(i)})^2 \bigr) - \log p_\theta(\boldsymbol x^{(i)} | \boldsymbol z^{(i)})\]

\(d\) est la taille de l’espace latent.

Note

Dans la plupart des cas, la vraisemblance est supposée gaussienne et la fonction de coût évaluant la reconstruction correspondera donc à l’erreur quadratique moyenne (F.mse_loss()). Dans notre cas, la distribution des valeurs des pixels de Fashion-MNIST est plutôt bimodale. Les images étant à valeurs entre 0 et 1, il est possible d’utiliser une entropie croisée binaire (F.bce_loss()) et c’est cette version qui donne les meilleurs résultats.

Le prior \(p_\theta(\boldsymbol z)\) est supposé être donné par une loi normale centrée réduite. La divergence de Kullback-Leibler est alors donnée par:

\[KL(q_\phi(\boldsymbol z | \boldsymbol x) || p_\theta(\boldsymbol z)) = \frac{1}{2} \bigl ( \text{tr}(\boldsymbol \sigma \boldsymbol I) + \boldsymbol \mu^T \boldsymbol \mu - k - \log \text{det}(\boldsymbol \sigma \boldsymbol I) \bigr)\]

Question

Modifier la fonction vae_loss() ci-dessous de sorte à calculer la fonction de coût du \(\beta\)-auto-encodeur variationnel (on souhaite pouvoir changer \(\beta\) facilement plus tard si besoin).

Correction

beta = 1.0

def vae_loss(hat_x, x, mu, logvar):
    reconstruction_loss = F.binary_cross_entropy(hat_x.view(-1, 28*28), x.view(-1, 28*28), reduction='sum')
    kl_divergence = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return reconstruction_loss + beta * kl_divergence

Entraînement

Ce travail effectué, nous disposons du modèle, de la fonction objectif et des données. Il ne reste plus qu’à réaliser l’apprentissage du VAE.

from tqdm.notebook import trange, tqdm

def train_vae(net, train_dataset, epochs=10, learning_rate=1e-3, batch_size=128, device=device):
    # Création du DataLoader pour charger les données
    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    # Définition de l'algorithme d'optimisation (Adam, variante de la SGD)
    optimizer = torch.optim.Adam(params=net.parameters(), lr=learning_rate, weight_decay=1e-5)
    # Choix de la fonction de coût
    criterion = vae_loss
    # Passe le modèle en mode "apprentissage"
    net = net.to(device)
    net = net.train()

    t = trange(1, epochs + 1, desc="Entraînement du modèle")
    for epoch in t:
        avg_loss = 0.
        # Parcours du dataset pour une epoch
        for images, _ in tqdm(train_dataloader):
            # les labels sont ignorés pour l'apprentissage de l'auto-encodeur

            images = images.to(device)
            # Calcul de la reconstruction
            reconstructions, latent_mu, latent_logvar = net(images)
            # Calcul de l'erreur
            loss = criterion(reconstructions, images, latent_mu, latent_logvar)

            # Rétropropagation du gradient
            optimizer.zero_grad()
            loss.backward()
            # Descente de gradient (une itération)
            optimizer.step()
            avg_loss += loss.item()

        avg_loss /= len(train_dataloader)
        t.set_description(f"Epoch {epoch}: loss = {avg_loss:.3f}")
    return net.to("cpu")

Question

Créer un modèle de VAE et lancer son apprentissage à l’aide de la fonction train_vae ci-dessus.

Correction

vae = VariationalAutoencoder(latent_dimension)
train_vae(vae, train_dataset)

Visualisation des reconstruction

Comme dans le cas de l’auto-encodeur classique, nous pouvons visualiser quelques reconstructions pour juger qualitativement des performances du modèle.

vae = vae.to("cpu")
images, _ = iter(test_dataloader).next()

# Images de test
plt.figure(figsize=(12, 6))
plt.title("Images du jeu de test bruitées")
show_grid(make_grid(images[1:50],10,5))

# Reconstruction et visualisation des images reconstruites
plt.figure(figsize=(12, 6))
plt.title("Reconstruction par l'auto-encodeur")
show_grid(visualize_reconstructions(vae, images))

Echantillonnage dans l’espace latent

Un VAE peut générer de nouvelles données à partir de vecteur tirés dans l’espacee latent. Plus le poids donné à la KL-divergence est important (\(\beta\) élevé), plus cet espace est “rempli”, au sens où la distribution de l’espace latent se rapproche d’une loi normale centrée réduite. Cette capacité d’interpolation augmentée vient en contrepartie de reconstructions pouvant être de significativement moins bonne qualité que des auto-encodeurs classiques.

Question

Échantillonner quelques vecteurs dans l’espace latent selon une loi centrée réduite. Décoder ces vecteurs pour obtenir des nouvelles images synthétiques. Vous pouvez passer par l’attribut .decoder de l’objet VAE pour accéder directement au décodeur.

Correction

vae.eval()

with torch.no_grad():

    # Échantillonnage selon une loi normale
    latent = torch.randn(100, latent_dimension, device=device)

    # Reconstruction
    fake_images = vae.decoder(latent).cpu()

fig = plt.figure(figsize=(12, 12))
show_grid(make_grid(fake_images[1:50],10,5))

Interpolation dans l’espace latent

Comme pour l’auto-encodeur classique, il est possible d’interpoler entre deux vecteurs arbitraires dans l’espace latent. Pour un auto-encodeur normal, cette interpolation produit rarement des images plausibles car l’espace latent est discontinu et contient de nombreux “trous”. En revanche, ce n’est pas le cas du VAE grâce au prior gaussien.

Question

Interpoler entre deux vecteurs aléatoires dans l’espace latent et visualiser le résultat (par exemple, 10 images sur le segment entre \(z_1\) et \(z_2\)).

Correction

z1 = torch.randn(1, latent_dimension, device=device)
z2 = torch.randn(1, latent_dimension, device=device)

n_steps = 10

fig = plt.figure(figsize=(16, 8))

for idx, alpha in enumerate(np.linspace(0, 1, n_steps + 1)):
    z = (1 - alpha) * z1 + alpha * z2
    with torch.no_grad():
        fake_image = vae.decoder(z)[0,0,:,:].cpu().numpy()

    fig.add_subplot(1, n_steps + 1, idx + 1)
    plt.imshow(fake_image)
    plt.title(f"$\\alpha = {alpha:0.1f}$")
plt.show()

Visualisation de l’espace latent

Nous pouvons comparer l’espace latent de l’auto-encodeur classique et celui de l’auto-encodeur variationnel, par exemple à l’aide de t-SNE pour projeter les codes latents du jeu de test sur un plan en deux dimensions.

Question

(optionnel, pour l’approfondissement) Pour toutes les images du jeu de test de Fashion-MNIST, calculer le code latent associé (on prendra la moyenne de la distribution dans le cas du VAE). Appliquer une réduction de dimension non-linéaire en utilisant la version de t-SNE disponible dans scikit-learn pour projeter les codes latents dans le plan. Coloriez les points en fonction de leur catégorie dans le jeu de données.

Que constatez-vous ? Que se passe-t-il si l’on refait l’expérience avec une valeur plus élevée pour \(\beta\) dans le VAE ? Avec une valeur plus faible ?