Travaux pratiques : auto-encodeurs variationnels¶
L’objectif de cette séance de travaux pratiques est d’illustrer la construction d’un espace latent en utilisant un auto-encodeur classique, puis un auto-encodeur variationnel. (cf. Kingma et Welling ).
Nous utiliserons ensuite le VAE pour générer des données synthétiques mais plausibles vis à vis de la distribution des données d’apprentissage.
Le TP est dimensionné de sorte à pouvoir être terminé sans accélérateur
graphique pour les calculs en utilisant la base de données
Fashion-MNIST. Si vous disposez d’un GPU (par exemple, parce que vous
travaillez sur Google Colab), vous pouvez changer la valeur de la
variable use_gpu
à True
.
import torch
use_gpu = False
device = torch.device("cuda:0" if use_gpu and torch.cuda.is_available() else "cpu")
print(f"Exécution sur {device}")
# Imports des bibliothèques utiles
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from matplotlib import pyplot as plt
Préambule¶
Pour commencer, nous allons charger en mémoire les données de Fashion-MNIST et en visualiser quelques unes. Ces images sont similaires en format aux données de MNIST : 28x28 pixels en niveaux de gris.
Ce jeu de données est préintégré dans la bibliothèque torchvision
:
from torch.utils.data import DataLoader
from torchvision.datasets import FashionMNIST
from torchvision.transforms import ToTensor, ToPILImage
train_dataset = FashionMNIST(root='./data/FashionMNIST', download=True, train=True, transform=ToTensor())
test_dataset = FashionMNIST(root='./data/FashionMNIST', download=True, train=False, transform=ToTensor())
Nous pouvons visualiser quelques unes de ces images:
n_images = 5
fig = plt.figure()
for i, (image, label) in enumerate(train_dataset):
fig.add_subplot(1, n_images+1, i+1)
plt.imshow(ToPILImage()(image), cmap="gray")
plt.axis("off")
if i >= n_images:
break
plt.show()
Auto-encodeur¶
Implémentation¶
Notre premier modèle sera un auto-encodeur convolutif doté de l’architecture ci-dessous.
Pour l’encodeur :
une couche de convolution (
kernel_size=4
,in_channels=1
,out_channels=32
,stride=2
,padding=1
, activation ReLU)une couche de convolution (
kernel_size=4
,in_channels=32
,out_channels=64
,stride=2
,padding=1
, activation ReLU)une couche linéaire (
in_features=64*7*7
,out_features=latent_dimension
)
Pour le décodeur:
une couche linéaire (
in_features=latent_dimension
,out_features=64*7*7
, activation ReLU)une couche de convolution transposée (
kernel size=4
,in_channels=64
,out_channels=32
,stride=2
,padding=1
, activation ReLU)une couche de convolution transposée (
kernel size=4
,in_channels=32
,out_channels=1
,stride=2
,padding=1
, activation sigmoide)
Note
Les filtres convolutifs sont choisis de taille 4x4 afin d’éviter des problèmes d’aliasing.
Question
Compléter l’implémentation ci-dessous de l’auto-encodeur dont l’architecture vient d’être décrite. Cette implémentation utilise l’interface torch.nn.Module dont la documentation peut vous être utile.
En plus de la reconstruction par l’auto-encodeur, on souhaite que la méthode forward()
renvoie également le code intermédiaire z
(un vecteur de longueur latent_dimension
) obtenu après le passage dans le décodeur.
Indice: l’utilisation de la méthode .view()
ou de la couche nn.Flatten()
peut être utile pour ré-arranger les tenseurs avant ou après les couches linéaires. Par exemple, x.view(-1, 64, 7, 7)
permet de transformer un tenseur de dimensions (batch, 3136)
en un tenseur de dimensions (batch, 64, 7, 7)
…
Correction
class AutoEncoder(nn.Module):
def __init__(self, latent_dimension):
super(AutoEncoder, self).__init__()
self.encoder = nn.Sequential(nn.Conv2d(1, 32, kernel_size=4, stride=2, padding=1),
nn.ReLU(),
nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=1),
nn.ReLU(),
nn.Flatten(),
nn.Linear(in_features=64*7*7, out_features=latent_dimension)
)
self.decoder_linear = nn.Linear(in_features=latent_dimension, out_features=64*7*7)
self.decoder = nn.Sequential(nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1),
nn.ReLU(),
nn.ConvTranspose2d(32, 1, kernel_size=4, stride=2, padding=1),
nn.Sigmoid(),
)
def forward(self, x):
###### Votre code ici ########
z = self.encoder(x)
hat_x = F.relu(self.decoder_linear(z))
hat_x = hat_x.view(-1, 64, 7, 7)
hat_x = self.decoder(hat_x)
return hat_x, z
Entraînement¶
Une fois le modèle implémenté, nous pouvons utiliser la fonction
train
ci-dessous pour réaliser l’apprentissage. L’optimisation se
fait selon le critère choisi dans la variable criterion
(par défaut,
il s’agit de l’erreur quadratique moyenne comme critère de
reconstruction).
from tqdm.notebook import trange, tqdm
def train(net, train_dataset, epochs=10, learning_rate=1e-3, batch_size=128, device=device):
# Création du DataLoader pour charger les données
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2)
# Définition de l'algorithme d'optimisation (Adam, variante de la SGD)
optimizer = torch.optim.Adam(params=net.parameters(), lr=learning_rate, weight_decay=1e-5)
# Choix de la fonction de coût
criterion = nn.MSELoss()
# Passe le modèle en mode "apprentissage"
net = net.to(device)
net = net.train()
train_loss_avg = []
t = trange(1, epochs + 1, desc="Entraînement du modèle")
for epoch in t:
avg_loss = 0.
# Parcours du dataset pour une epoch
for images, _ in tqdm(train_dataloader):
# les labels sont ignorés pour l'apprentissage de l'auto-encodeur
images = images.to(device)
# Calcul de la reconstruction
reconstructions, _ = net(images)
# Calcul del'erreur
loss = criterion(reconstructions, images)
# Rétropropagation du gradient
optimizer.zero_grad()
loss.backward()
# Descente de gradient (une itération)
optimizer.step()
avg_loss += loss.item()
avg_loss /= len(train_dataloader)
t.set_description(f"Epoch {epoch}: loss = {avg_loss:.3f}")
return net
Nous pouvons créer un modèle en spécifiant la dimension de son espace latent (par exemple, 10):
latent_dimension = 10
net = AutoEncoder(latent_dimension)
Puis démarrer son entraînement (sur CPU, cette opération peut prendre jusqu’à une dizaine de minutes) :
net = train(net, train_dataset)
Visualisation des reconstructions¶
Une fois l’apprentissage terminé, nous pouvons visualiser quelques reconstructions obtenues grâce à l’auto-encodeur. Cela permet de jauger qualitativement des performances du modèle en reconstruction.
Note
La compréhension fine des fonctions make_grid
, show_grid
et visualize_reconstructions
n’est pas indispensable à la poursuite du TP.
from torchvision.utils import make_grid
net = net.eval()
test_dataloader = DataLoader(test_dataset, batch_size=128, shuffle=False)
def show_grid(grid):
plt.imshow(np.transpose(grid.numpy(), (1, 2, 0)))
plt.show()
def visualize_reconstructions(net, images, device=device):
# Mode inférence
with torch.no_grad():
images = images.to(device)
reconstructions = net(images)[0]
image_grid = make_grid(reconstructions[1:50], 10, 5).cpu()
return image_grid
images, _ = iter(test_dataloader).next()
# Images de test
plt.figure(figsize=(12, 6))
plt.title("Images du jeu de test")
show_grid(make_grid(images[1:50],10,5))
# Reconstruction et visualisation des images reconstruites
plt.figure(figsize=(12, 6))
plt.title("Reconstruction par l'auto-encodeur")
show_grid(visualize_reconstructions(net, images))
Débruitage¶
Une capacité intéressante des auto-encodeurs est leur capacité à apprendre des filtres robustes au bruit. En particulier, en bruitant légèrement une observation, on retrouve généralement la reconstruction moyenne non-bruitée. Cette propriété de débruitage est particulièrement intéressante pour l’amélioration de la qualité des signaux (images, sons, etc.).
Observons la capacité de débruitage de notre auto-encodeur sur un échantillon d’images de test.
Question
Compléter le code ci-dessous pour ajouter un bruit blanc uniforme aux images de test contenues dans le tenseur images
. Pensez à ajuster l’amplitude du bruit et à limiter les valeurs des pixels de sortie à la plage autorisée \([0,1]\) (la fonction clamp
peut vous aider).
Correction
# Bruit blanc (uniforme) centré en 0
noise = torch.rand_like(images) - 0.5
# Ajout du bruit + troncature des valeurs en dehors de [0,1]
noisy_images = torch.clamp(images + 0.5 * noise, 0, 1)
Nous pouvons alors visualiser la reconstruction des images bruitées.
# Images de test
plt.figure(figsize=(12, 6))
plt.title("Images du jeu de test bruitées")
show_grid(make_grid(noisy_images[1:50],10,5))
# Reconstruction et visualisation des images reconstruites
plt.figure(figsize=(12, 6))
plt.title("Reconstruction par l'auto-encodeur")
show_grid(visualize_reconstructions(net, noisy_images))
Question
Comparer les reconstructions des images bruitées aux reconstructions obtenues sur les images de test originales. Que constatez-vous ?
Correction
Pour des bruits de faible amplitude, les images reconstruites sont moins bruitées. Autrement dit, l’auto-encodeur « débruite » en partie les images. En revanche, lorsque l’amplitude du bruit est forte, le signal utile de l’image disparaît et la sortie devient décorrélée de l’entrée.
Auto-encodeurs variationnels¶
Implémentation¶
Nous allons à présent implémenter un VAE convolutif qui hérite de la même structure que l’auto-encodeur que nous avons précédemment défini. Pour nous simplifier les choses par la suite, nous allons commencer par séparer le sous-réseau qui définit l’encodeur de celui qui définit le décodeur.
Question: en reprenant ce qui a été fait plus haut pour l’auto-encodeur classique, compléter les implémentations ci-dessous de l’encodeur et du décodeur pour le VAE. On rappelle que, contrairement à l’auto-encodeur, la sortie de l’encodeur est double :
le vecteur
mu
qui contient la moyenne de la gaussienne dans l’espace latent,le vecteur
sigma
qui contient les variances selon les différentes directions de la gaussienne dans l’espace latent.
Ces valeurs seront les paramètres de la gaussienne associée à une observation \(x\). Ces deux vecteurs ont pour dimension la dimension de l’espace latent.
Correction
class Encoder(nn.Module):
def __init__(self, latent_dimension):
super(Encoder, self).__init__()
self.model = nn.Sequential(nn.Conv2d(1, 32, kernel_size=4, stride=2, padding=1),
nn.ReLU(),
nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=1),
nn.ReLU(),
nn.Flatten(),
)
self.linear1 = nn.Linear(in_features=64*7*7, out_features=latent_dimension)
self.linear2 = nn.Linear(in_features=64*7*7, out_features=latent_dimension)
def forward(self, x):
x = self.model(x)
x_mu = self.linear1(x)
x_logvar = self.linear2(x)
return x_mu, x_logvar
class Decoder(nn.Module):
def __init__(self, latent_dimension):
super(Decoder, self).__init__()
self.linear = nn.Linear(in_features=latent_dimension, out_features=64*7*7)
self.model = nn.Sequential(nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1),
nn.ReLU(),
nn.ConvTranspose2d(32, 1, kernel_size=4, stride=2, padding=1),
nn.Sigmoid(),
)
def forward(self, z):
hat_x = F.relu(self.linear(z))
hat_x = hat_x.view(-1, 64, 7, 7)
hat_x = self.model(hat_x)
return hat_x
Nous allons à présent combiner l’encodeur et le décodeur pour former
l’auto-encodeur variationnel complet. Il y a néanmoins une petite
subtilité car nous devons implémenter l’astuce de reparamétrisation.
Celle-ci est implémenter dans la méthode latent_sample
Lors d’un passage avant (forward
), le schéma suivant doit se
dérouler :
L’encodeur prend
x
en entrée et produit la moyennemu
et la variancelogvar
de la distribution. En pratique, on verra aussix_recon
la reconstruction dex
.On tire un échantillon aléatoire
z
dans l’espace latent à l’aide de la méthodelatent_sample
. L’échantillonnage est fait selon la distribution gaussienne latente associée à \(x\) grâce à la reparamétrisation. Lors de l’inférence, on ne réalisera pas d’échantillonnage mais on se contentera d’utiliser la moyenne de la gaussienne.L’échantillon aléatoire
z
est passé dans le décodeur de sorte à obtenir la reconstructionx_recon
.
Question
Compléter l’implémentation ci-dessous de l’auto-encodeur variationnel.
Correction
class VariationalAutoencoder(nn.Module):
def __init__(self, latent_dim):
super(VariationalAutoencoder, self).__init__()
self.encoder = Encoder(latent_dim)
self.decoder = Decoder(latent_dim)
def forward(self, x):
latent_mu, latent_logvar = self.encoder(x)
z = self.latent_sample(latent_mu, latent_logvar)
hat_x = self.decoder(z)
return hat_x, latent_mu, latent_logvar
def latent_sample(self, mu, logvar):
if self.training:
# the reparameterization trick
std = logvar.mul(0.5).exp_()
eps = torch.empty_like(std).normal_()
return eps.mul(std).add_(mu)
else:
return mu
Enfin, il reste à définir la fonction de coût du VAE. D’après le cours, on cherche à maximiser l’ELBO \(\mathcal L\). Ici, on choisira de minimiser \(-\mathcal L\) avec
La fonction de coût pour une reconstruction sur une seule donnée \(\boldsymbol x^{(i)}\) est approximée par:
où \(d\) est la taille de l’espace latent.
Note
Dans la plupart des cas, la vraisemblance est supposée gaussienne et la fonction de coût évaluant la reconstruction correspondera donc à l’erreur quadratique moyenne (F.mse_loss()
). Dans notre cas, la distribution des valeurs des pixels de Fashion-MNIST est plutôt bimodale. Les images étant à valeurs entre 0 et 1, il est possible d’utiliser une entropie croisée binaire (F.bce_loss()
) et c’est cette version qui donne les meilleurs résultats.
Le prior \(p_\theta(\boldsymbol z)\) est supposé être donné par une loi normale centrée réduite. La divergence de Kullback-Leibler est alors donnée par:
Question
Modifier la fonction vae_loss()
ci-dessous de sorte à calculer la fonction de coût du \(\beta\)-auto-encodeur variationnel (on souhaite pouvoir changer \(\beta\) facilement plus tard si besoin).
Correction
beta = 1.0
def vae_loss(hat_x, x, mu, logvar):
reconstruction_loss = F.binary_cross_entropy(hat_x.view(-1, 28*28), x.view(-1, 28*28), reduction='sum')
kl_divergence = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
return reconstruction_loss + beta * kl_divergence
Entraînement¶
Ce travail effectué, nous disposons du modèle, de la fonction objectif et des données. Il ne reste plus qu’à réaliser l’apprentissage du VAE.
from tqdm.notebook import trange, tqdm
def train_vae(net, train_dataset, epochs=10, learning_rate=1e-3, batch_size=128, device=device):
# Création du DataLoader pour charger les données
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
# Définition de l'algorithme d'optimisation (Adam, variante de la SGD)
optimizer = torch.optim.Adam(params=net.parameters(), lr=learning_rate, weight_decay=1e-5)
# Choix de la fonction de coût
criterion = vae_loss
# Passe le modèle en mode "apprentissage"
net = net.to(device)
net = net.train()
t = trange(1, epochs + 1, desc="Entraînement du modèle")
for epoch in t:
avg_loss = 0.
# Parcours du dataset pour une epoch
for images, _ in tqdm(train_dataloader):
# les labels sont ignorés pour l'apprentissage de l'auto-encodeur
images = images.to(device)
# Calcul de la reconstruction
reconstructions, latent_mu, latent_logvar = net(images)
# Calcul de l'erreur
loss = criterion(reconstructions, images, latent_mu, latent_logvar)
# Rétropropagation du gradient
optimizer.zero_grad()
loss.backward()
# Descente de gradient (une itération)
optimizer.step()
avg_loss += loss.item()
avg_loss /= len(train_dataloader)
t.set_description(f"Epoch {epoch}: loss = {avg_loss:.3f}")
return net.to("cpu")
Question
Créer un modèle de VAE et lancer son apprentissage à l’aide de la fonction train_vae
ci-dessus.
Correction
vae = VariationalAutoencoder(latent_dimension)
train_vae(vae, train_dataset)
Visualisation des reconstruction¶
Comme dans le cas de l’auto-encodeur classique, nous pouvons visualiser quelques reconstructions pour juger qualitativement des performances du modèle.
vae = vae.to("cpu")
images, _ = iter(test_dataloader).next()
# Images de test
plt.figure(figsize=(12, 6))
plt.title("Images du jeu de test bruitées")
show_grid(make_grid(images[1:50],10,5))
# Reconstruction et visualisation des images reconstruites
plt.figure(figsize=(12, 6))
plt.title("Reconstruction par l'auto-encodeur")
show_grid(visualize_reconstructions(vae, images))
Echantillonnage dans l’espace latent¶
Un VAE peut générer de nouvelles données à partir de vecteur tirés dans l’espacee latent. Plus le poids donné à la KL-divergence est important (\(\beta\) élevé), plus cet espace est “rempli”, au sens où la distribution de l’espace latent se rapproche d’une loi normale centrée réduite. Cette capacité d’interpolation augmentée vient en contrepartie de reconstructions pouvant être de significativement moins bonne qualité que des auto-encodeurs classiques.
Question
Échantillonner quelques vecteurs dans l’espace latent selon une loi centrée réduite. Décoder ces vecteurs pour obtenir des nouvelles images synthétiques. Vous pouvez passer par l’attribut .decoder
de l’objet VAE pour accéder directement au décodeur.
Correction
vae.eval()
with torch.no_grad():
# Échantillonnage selon une loi normale
latent = torch.randn(100, latent_dimension, device=device)
# Reconstruction
fake_images = vae.decoder(latent).cpu()
fig = plt.figure(figsize=(12, 12))
show_grid(make_grid(fake_images[1:50],10,5))
Interpolation dans l’espace latent¶
Comme pour l’auto-encodeur classique, il est possible d’interpoler entre deux vecteurs arbitraires dans l’espace latent. Pour un auto-encodeur normal, cette interpolation produit rarement des images plausibles car l’espace latent est discontinu et contient de nombreux “trous”. En revanche, ce n’est pas le cas du VAE grâce au prior gaussien.
Question
Interpoler entre deux vecteurs aléatoires dans l’espace latent et visualiser le résultat (par exemple, 10 images sur le segment entre \(z_1\) et \(z_2\)).
Correction
z1 = torch.randn(1, latent_dimension, device=device)
z2 = torch.randn(1, latent_dimension, device=device)
n_steps = 10
fig = plt.figure(figsize=(16, 8))
for idx, alpha in enumerate(np.linspace(0, 1, n_steps + 1)):
z = (1 - alpha) * z1 + alpha * z2
with torch.no_grad():
fake_image = vae.decoder(z)[0,0,:,:].cpu().numpy()
fig.add_subplot(1, n_steps + 1, idx + 1)
plt.imshow(fake_image)
plt.title(f"$\\alpha = {alpha:0.1f}$")
plt.show()
Visualisation de l’espace latent¶
Nous pouvons comparer l’espace latent de l’auto-encodeur classique et celui de l’auto-encodeur variationnel, par exemple à l’aide de t-SNE pour projeter les codes latents du jeu de test sur un plan en deux dimensions.
Question
(optionnel, pour l’approfondissement) Pour toutes les images du jeu de test de Fashion-MNIST, calculer le code latent associé (on prendra la moyenne de la distribution dans le cas du VAE). Appliquer une réduction de dimension non-linéaire en utilisant la version de t-SNE disponible dans scikit-learn pour projeter les codes latents dans le plan. Coloriez les points en fonction de leur catégorie dans le jeu de données.
Que constatez-vous ? Que se passe-t-il si l’on refait l’expérience avec une valeur plus élevée pour \(\beta\) dans le VAE ? Avec une valeur plus faible ?