Travaux pratiques : introduction aux modèles génératifs¶
L’objectif de cette séance de TP est de réaliser un bref tour d’horizon de trois types de modèles génératifs simples:
modèles de mélanges gaussiens,
chaînes de Markov,
autoencodeurs.
La séance est divisée en 3 exercices indépendants pouvant être traités dans le désordre.
Exercice 1 : Modèles de mélanges gaussiens¶
Ce premier exercice est une application directe des modèles de mélanges gaussiens. L’objectif est d’illustrer le fonctionnement de l’échantillonnage de nouvelles données à partir de la densité estimée.
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
Pour cet exercice, nous allons travailler sur un nuage de points synthétique qui est généré à partir d’une sinusoïde. Les observations \(X\) sont bi-dimensionnelles avec:
avec:
\(x_1\) une variable aléatoire tirée uniformément dans \([0, 10]\),
\(\epsilon\) un bruit aléatoire tiré d’une loi normale \(\mathcal{N}(0, 0.1)\).
n_samples = 300
x1 = 10 * np.random.rand(n_samples)
x2 = np.sin(x1) + 0.1*np.random.randn(n_samples)
X = np.vstack((x1, x2))
X = np.transpose(X, (1, 0))
# Affichage du nuage de point
plt.scatter(X[:,0], X[:,1], s=10)
plt.title
plt.show()
Question
Appliquer un modèle de mélange gaussien à la matrice d’observations \(X\). On choisira de façon empirique un nombre de composantes qui semble adapté au nuage de points.
Correction
L’idéal serait d’utiliser le critère AIC ou BIC pour déterminer le nombre de composantes. En première approximation, considérons 6 composantes:
from sklearn.mixture import GaussianMixture
gmm = GaussianMixture(n_components=6)
gmm.fit(X)
def make_ellipses(gmm, ax):
colors = ['navy', 'firebrick', 'darkorange', 'dimgrey', 'forestgreen',
'purple', 'turquoise', 'yellow', 'hotpink', 'springgreen']
for n, color in enumerate(colors[:gmm.n_components]):
if gmm.covariance_type == 'full':
covariances = gmm.covariances_[n][:2, :2]
elif gmm.covariance_type == 'tied':
covariances = gmm.covariances_[:2, :2]
elif gmm.covariance_type == 'diag':
covariances = np.diag(gmm.covariances_[n][:2])
elif gmm.covariance_type == 'spherical':
covariances = np.eye(gmm.means_.shape[1]) * gmm.covariances_[n]
v, w = np.linalg.eigh(covariances)
u = w[0] / np.linalg.norm(w[0])
angle = np.arctan2(u[1], u[0])
angle = 180 * angle / np.pi # convert to degrees
v = 2. * np.sqrt(2.) * np.sqrt(v)
ell = mpl.patches.Ellipse(gmm.means_[n, :2], v[0], v[1],
180 + angle, color=color)
ell.set_clip_box(ax.bbox)
ell.set_alpha(0.5)
ax.add_artist(ell)
ax.text(*(gmm.means_[n, :2]+0.1), str(n), fontsize=14)
Le code ci-dessous permet d’afficher les nuage de points et les ellipses correspondants aux gaussiennes (centrées sur la moyenne et d’axes égaux aux covariances) en surimpression.
Note: il n’est pas indispensable de comprendre les détails de la
fonction make_ellipses
pour ce TP, celle-ci faisant appel à des
fonctionnalités avancées de la bibliothèque graphique matplotlib
.
ax = plt.axes()
make_ellipses(gmm, ax)
ax.scatter(X[:, 0], X[:, 1], s=3)
ax.set_xlim(0, 10)
ax.set_ylim((-1.5, 1.5))
plt.tight_layout()
plt.show()
Question
Tirer aléatoirement 200 points dans le modèle de mélange. Afficher le résultat.
Indice: utiliser une des méthodes de l’objet GaussianMixture.
Correction
sklearn
intègre la méthode sample()
pour échantillonner dans le mélange :
X_samples = gmm.sample(200)[0]
plt.scatter(X[:, 0], X[:, 1], s=3, alpha=0.5)
plt.scatter(X_samples[:, 0], X_samples[:, 1], marker='x', s=20)
Question
Tirer aléatoirement 100 points dans la première composante du mélange. Afficher le résultat.
Indice: la moyenne et la matrice de variance-covariance de la composante numéro \(i\) du mélange sont stockées respectivement dans gmm.means_[i]
et gmm.covariances_[i]
. La fonction multivariate_normal
de NumPy peut être utile.
Correction
Chaque composante du mélange est une gaussienne multivariée dont la moyenne et la matrice de covariance sont estimées lors de l’apprentissage. Pour échantillonner selon une composante précise, il suffit donc de tirer selon cette gaussienne :
i = 0 # numéro de la composante souhaitée
X_samples = np.random.multivariate_normal(gmm.means_[i], gmm.covariances_[i], size=100)
plt.scatter(X[:, 0], X[:, 1], s=3, alpha=0.5)
plt.scatter(X_samples[:, 0], X_samples[:, 1], marker='x', s=20)
Exercice 2 : Autoencodeurs et génération d’images¶
Cet exercice présente l’utilisation des autoencodeurs avec PyTorch. Nous allons utiliser un modèle simple, entièrement connecté, permettant de compresser une image en une représentation vectorielle.
Cet exercice utilise MNIST (ou FashionMNIST) comme jeu de données.
# Imports liés à torch
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
# Matplotlib
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm, trange
mnist = torchvision.datasets.FashionMNIST("data/", download=True, transform=torchvision.transforms.ToTensor())
Question
Visualiser les premières images du jeu de données.
Correction
fig = plt.figure()
for idx, (img, label) in enumerate(mnist):
if idx > 4:
break
fig.add_subplot(1,5,idx+1)
plt.imshow(img[0].numpy())
plt.axis('off')
plt.title(label)
plt.show()
Question
En utilisant l’interface nn.Sequential
de PyTorch, écrire le code qui définit un modèle autoencodeur entièrement connecté.
Ce modèle prend en entrée une image de dimensions \(28\times28\) sous forme d’un vecteur aplati de longeur \(784\). On définira une variable hidden_state
qui permet de contrôler la taille du code \(z\) en sortie de l’encodeur.
Le décodeur devra prendre en entrée un code \(z\) de longueur hidden_state
et produire un vecteur aplati de longueur \(28\times28 = 784\) (identique à l’image). On choisira une valeur raisonnable pour la dimension du code (par exemple, entre 30 et 250).
Correction
On se borne ici à un auto-encodeur « simple », c’est-à-dire une combinaison de deux perceptrons multi-couches. On pourrait aussi utiliser un encodeur convolutif.
hidden_state = 128
encoder = nn.Sequential(
nn.Linear(28*28, 1024),
nn.ReLU(),
nn.Linear(1024, 256),
nn.ReLU(),
nn.Linear(256, hidden_state)
)
decoder = nn.Sequential(
nn.Linear(hidden_state, 256),
nn.ReLU(),
nn.Linear(256, 1024),
nn.ReLU(),
nn.Linear(1024, 28*28),
nn.Sigmoid(),
)
autoencoder = nn.Sequential(encoder, decoder)
Question
Y a-t-il un intérêt à utiliser une fonction de non-linéarité en sortie du décodeur? Justifier.
Correction
Lorsque l’espace de sortie (et donc l’espace d’entrée) n’est pas bornée, il n’est généralement contre-productif d’appliquer une non-linéarité en sortie du décodeur. Par exemple, si l’on souhaite reconstruire des valeurs réelles positives et négatives, il ne faut surtout pas appliquer de ReLU (qui va interdire les valeurs négatives).
Dans notre cas, les valeurs des pixels sont cependant bornées entre 0 et 1. Il est possible que le décodeur produise des valeurs légèrement négatives ou légèrement supérieures à 1, ce qui produira des artefacts dans l’image si on ne les enlève pas (clipping). Une solution élégante est d’appliquer une non-linéarité sigmoïde qui va contraindre les valeurs des pixels dans la plage (0,1) désirée.
def train(model, dataset, epochs=20, device="cpu"):
dataloader = torch.utils.data.DataLoader(dataset, batch_size=64, shuffle=True, num_workers=1)
criterion = nn.L1Loss()
optimizer = torch.optim.Adam(model.parameters())
flatten = nn.Flatten()
model = model.to(device)
model.train()
t = trange(1, epochs + 1, desc="Entraînement du modèle")
for e in t:
avg_loss = 0.
for batch, _ in tqdm(dataloader, leave=False):
batch = batch.to(device)
model.zero_grad()
batch = flatten(batch)
reconstruction = model(batch)
loss = criterion(reconstruction, batch)
loss.backward()
optimizer.step()
avg_loss += loss.item()
avg_loss /= len(dataloader)
t.set_description(f"Epoch {e}: loss = {avg_loss:.3f}")
return model
Question
Pourquoi privilégie-t-on dans ce cas précis la fonction de perte \(L_1\) (mean absolute error) plutôt que de la fonction de perte \(L_2\) (mean squared error) ?
Correction
L’erreur quadratique moyenne « écrase » les petites valeurs de l’erreur (inférieures à 1) à cause du carré. Dans notre cas, les valeurs des pixels sont entre 0 et 1. Pour avoir un gradient plus élevé même pour des erreurs faibles, on utilise donc plutôt l’erreur absolue.
Question
En utilisant la fonction train()
définie ci-dessus, réaliser l’apprentissage de l’autoencodeur défini précédemment.
Correction
train(autoencoder, mnist, epochs=10, device="cpu")
Question
Pour quelques images du jeu de données, visualiser l’image originale et sa reconstruction.
Correction
data, _ = mnist[444]
plt.imshow(data.numpy()[0]) and plt.show()
with torch.no_grad():
reconst = autoencoder(data.reshape(1, -1)).numpy()[0].reshape(28, 28)
plt.imshow(reconst) and plt.show()
Question
Choisir deux images du jeu de données de classes différentes. Calculer leurs codes \(z_1\) et \(z_2\) à l’aide de l’encodeur. Afficher la reconstruction du code moyen \(0,5 z_1 + 0,5 z_2\). Que constatez-vous ?
Correction
flatten = nn.Flatten()
c1 = encoder(flatten(mnist[444][0]))
c2 = encoder(flatten(mnist[3][0]))
fig = plt.figure(figsize=(12, 6))
alphas = np.linspace(0, 1, 10)
for idx, alpha in enumerate(alphas):
fig.add_subplot(1, len(alphas), idx+1)
c = alpha * c1 + (1 - alpha) * c2
with torch.no_grad():
img = decoder(c)
img = img.numpy()[0].reshape(28, 28)
plt.imshow(img)
plt.show()
La reconstruction du code interpolé n’est pas particulièrement réaliste et correspond à un mélange flou des deux images.
Question
Afficher les résultats des reconstructions de l’interpolation entre \(z_1\) et \(z_2\) pour différentes valeurs du facteur d’interpolation \(\alpha\).
Correction
c1 = encoder(flatten(mnist[6][0]))
c2 = encoder(flatten(mnist[8][0]))
fig = plt.figure(figsize=(12, 6))
alphas = np.linspace(0, 1, 10)
for idx, alpha in enumerate(alphas):
fig.add_subplot(1, len(alphas), idx+1)
c = alpha * c1 + (1 - alpha) * c2
with torch.no_grad():
img = decoder(c)
img = img.numpy()[0].reshape(28, 28)
plt.imshow(img)
plt.show()
On remarque que l’interpolation donne des résultats peu plausibles. Lorsque l’on s’éloigne des petites valeurs de \(\alpha\). L’espace latent de l’auto-encodeur n’est pas aussi régulier qu’espéré.
Question
Construire un nouveau jeu de données contenant:
une matrice
codes
, qui contient pour chaque ligne \(i\) le code \(z_i\) obtenu en encodant l’image \(i\) du jeu de donnéesmnist
,une liste
labels
dont l’élément \(i\) représente l’étiquette de l’image \(i\) du jeu de donnéesmnist
.
Appliquer une réduction de dimension non-linéaire par l’algorithme t-SNE sur cette matrice de codes de sorte à pouvoir afficher un nuage de points dans le plan. On prendra soin de colorer chaque point en fonction de son étiquette.
Que constatez-vous ?
Correction
Sans aucune supervision, l’espace latent de l’auto-encodeur permet de relativement bien séparer les différentes classes d’objets avec une dimensionalité nettement inférieure à celle des données d’origine.
codes, labels = [], []
for data, label in tqdm(mnist):
with torch.no_grad():
code = encoder(flatten(data))[0]
codes.append(code.numpy())
labels.append(label)
codes = np.array(codes)
labels = np.array(labels)
from sklearn.manifold import TSNE
proj = TSNE(n_components=2)
code_projections = proj.fit_transform(codes)
fig, ax = plt.subplots(figsize=(12, 12))
scatter = plt.scatter(code_projections[:, 0], code_projections[:, 1], c=labels, cmap=plt.cm.Set1)
legend1 = ax.legend(*scatter.legend_elements(),
loc="lower left", title="Classes")
ax.add_artist(legend1)
plt.show()
Question
Approfondissement (ne faites cette question qu’à la fin si vous avez encore du temps)
Réaliser la même visualisation par t-SNE en utilisant non pas le code intermédiaire mais directement l’image aplatie. Que constatez-vous? Pourquoi ne peut-on pas utiliser cette approche dans le cas général?
Correction
Le t-SNE appliqué directement sur l’image sépare encore mieux les données. On peut supposer que la réduction de dimension dans l’auto-encodeur nous a fait perdre un peu d’information. Dans le cas général, ce n’est pas possible d’appliquer t-SNE sur les données brutes : la dimensionalité des données est bien trop élevée. Cela ne fonctionne ici que parce que les images sont petites (\(28 \times 28\)).
codes, labels = [], []
for data, label in tqdm(mnist):
codes.append(data.reshape(-1).numpy())
labels.append(label)
codes = np.array(codes)
labels = np.array(labels)
from sklearn.manifold import TSNE
proj = TSNE(n_components=2)
code_projections = proj.fit_transform(codes)
fig, ax = plt.subplots(figsize=(12, 12))
scatter = plt.scatter(code_projections[:, 0], code_projections[:, 1], c=labels, cmap=plt.cm.Set1)
legend1 = ax.legend(*scatter.legend_elements(),
loc="lower left", title="Classes")
ax.add_artist(legend1)
plt.show()
Exercice 3 : chaîne de Markov pour la génération de texte¶
L’objectif de cet exercice est d’implémenter en Python pur une chaîne de Markov simple. Rappelons qu’une chaîne de Markov représente une séquence \((X_t)\) où l’on s’intéresse à modéliser les probabilités de transitions qui permettent de passer d’un état \(i\) à un état \(j\), c’est-à-dire à la matrice:
Pour cet exercice, nous allons utiliser un corpus textuel correspondant à la traduction française du roman Frankenstein ou le Prométhée moderne de l’écrivaine Mary Shelley (1797-1851).
La modélisation du problème est la suivante:
un mot représente un état \(i\) de la chaîne,
on considère la fréquence des bi-grammes de mots comme probabilités de transition.
Pour être plus précis, on considérera que l’ensemble \(E\) des états possibles correspond à un dictionnaire (fini) de mots. Pour chaque bi-gramme, c’est-à-dire chaque paire de mots \((X_t = i, X_{t+1} = j)\), nous allons calculer son nombre d’occurrences \(N_{i,j}\).
Pour un mot donné \(i\), sa probabilité de transition vers le mot suivant \(j\) correspond à la probabilité d’occurrence du bigramme \((i,j)\), c’est-à-dire :
Observons que le cas des chaînes de Markov d’ordre 2 et supérieur correspond à s’intéresser aux n-grammes pour \(n>2\).
Note: les mots du dictionnaire \(E\) sont à prendre au sens large. En particulier, on considérera que les symboles de ponctuation (?, !, …, etc.) constituent des mots à part entière dans le dictionnaire.
Afin de débuter cet exercice, commençons par examiner le corpus et créer le dictionnaire d’états \(E\). Pour ce faire, nous transformer la chaîne de caractères qui constitue l’intégralité du texte en une séquence de mots. Cette opération s’appelle la tokenization (ou segmentation). Elle s’appuie sur des règles de grammaire propres à chaque langage.
Note: Le fonctionnement précis de ce processus est hors-sujet pour ce cours, se référer au module de traitement automatique du langage de RCP217 pour plus de détails.
Nous allons utiliser la bibliothèque de traitement du langage nltk
pour réaliser la tokenization:
import numpy as np
from nltk.tokenize import word_tokenize
import nltk
nltk.download('punkt')
import requests
# Téléchargement du fichier
r = requests.get("https://cedric.cnam.fr/vertigo/Cours/RCP211/data/frankenstein.txt")
with open("frankenstein.txt", "wb") as f:
f.write(r.content)
with open("frankenstein.txt") as f:
corpus = word_tokenize(f.read(), language='french')
Nous pouvons désormais afficher le début du corpus:
" ".join(corpus[:50])
Question
Combien de mots uniques ce corpus contient-il ?
Correction
words = np.unique(corpus)
word_index = {word: idx for idx, word in enumerate(words)}
len(words)
Afin de créer une chaîne de Markov d’ordre 1 sur ces données, il nous faut produire la liste de tous les bi-grammes contenus dans le texte. Nous pouvons les générer avec la fonction suivante :
def make_pairs(corpus):
for i in range(len(corpus) - 1):
yield (corpus[i], corpus[i+1])
Nous allons utiliser l’algorithme suivant pour construire la matrice de transition:
Créer un dictionnaire d vide
Pour chaque paire (i, j) du corpus:
Si d n'a pas d'entrée pour i:
d[i] = dictionnaire vide
Si d[i] n'a pas d'entrée pour j:
d[i][j] = 1
Sinon:
d[i][j] += 1
Renvoyer d
Remarquons que cet algorithme ne construit pas exactement la matrice de
transition. Elle produit un double dictionnaire (c’est-à-dire un
dictionnaire dont les éléments sont également des dictionnaires) tel que
d[mot1][mot2]
renvoie le nombre d’occurrences du bi-gramme (mot1,
mot2) dans le corpus.
Question
Écrire une fonction build_transition_matrix
qui implémente cet algorithme. Les paires sont calculées à l’aide de la fonction make_pairs
.
Correction
def build_transition_matrix(corpus):
"""
Construit un double dictionnaire d tel que:
d[mot1][mot2] = nombre d'occurrences du bi-gramme (mot1, mot2) dans le corpus
Arguments
---------
corpus (str list): une liste de mots
Renvoie
-------
dict: double dictionnaire des nombres d'occurrences des bi-grammes
"""
pairs = make_pairs(corpus)
transition_matrix = {}
for word1, word2 in pairs:
if word1 not in transition_matrix.keys():
transition_matrix[word1] = {}
if word2 not in transition_matrix[word1].keys():
transition_matrix[word1][word2] = 1
else:
transition_matrix[word1][word2] += 1
return transition_matrix
Question
Pourquoi utiliser un dictionnaire plutôt qu’une matrice (dense) de transition ?
Correction
La matrice dense de transition devrait contenir une entrée pour chaque bi-gramme, c’est-à-dire \(N\times N\) entrées. Vu le nombre de mots unique du dictionnaire, cette matrice occuperait beaucoup de mémoire. L’utilisation du dictionnaire permet de ne stocker réellement que les bi-grammes qui existent (et donc d’ignorer tous les 0 de la matrice de transition).
Nous pouvons désormais calculer la “matrice de transition”:
transition_matrix = build_transition_matrix(corpus)
Question
Afficher les 10 bi-grammes les plus fréquents.
Correction
from collections import defaultdict
bigrams = defaultdict(lambda: 0)
for w1, d in transition_matrix.items():
for w2, c in d.items():
bigrams[w1, w2] = c
sorted(bigrams.items(), key=lambda x: -x[1])
Question
Afficher les probabilités de transition pour tous les bi-grammes commençant par “une”.
Correction
transition_matrix["une"]
Question
Écrire une fonction sample_next_word(word, transition_matrix)
qui, à partir d’un mot word
et de la matrice de transition tire au hasard le mot suivant en respectant les probabilités de transition \(p_{i,j}\).
Indice: on pourra utiliser à bon escient la fonction np.random.choice
de NumPy.
Correction
def sample_next_word(word, transition_matrix):
"""
Renvoie un mot au hasard complétant la séquence en suivant les probabilités de transition
Arguments
---------
- word (str): un mot (première moitié du bi-gramme)
- transition_matrix (dict): double dictionnaire du nombre d'occurrences
Renvoie
-------
- str: un mot complétant le bi-gramme
"""
# Dictionnaire des transitions pour "word"
transitions = transition_matrix[word]
# Bi-grammes possibles
candidates = np.array(list(transitions.keys()))
# Nombre d'occurrence des bi-grammes commençant par "word"
weights = np.array(list(transitions.values()))
# Tirage aléatoire
next_word = np.random.choice(candidates,
p=weights/sum(weights))
return next_word
Question
Utiliser cette fonction pour générer une phrase commençant par “Une”.
Correction
w = "Une"
sentence = [w]
while w != ".":
w = sample_next_word(w, transition_matrix)
sentence.append(w)
print(" ".join(sentence))
Question
Approfondissement (ne faites cette question que si vous avez le temps)
Réaliser le même travail mais dans le cas de la chaîne de Markov d’ordre 2 (raisonner sur des triplets plutôt que des pairs).
Indice: on pourra utiliser defaultdict pour écrire plus simplement la fonction de construction de la matrice de transition.
Correction
def make_triplets(corpus):
for i in range(len(corpus) - 2):
yield (corpus[i], corpus[i+1], corpus[i+2])
from collections import defaultdict
def build_transition_matrix_triplets(triplets):
triplets = make_triplets(corpus)
transition_matrix = defaultdict(lambda: defaultdict(lambda: 0))
for word1, word2, word3 in triplets:
transition_matrix[word1, word2][word3] += 1
return transition_matrix
triplet_transition_matrix = build_transition_matrix_triplets(corpus)
def sample_next_word_triplets(word1, word2, transition_matrix):
# Dictionnaire des transitions pour "word1", "word2"
transitions = transition_matrix[word1, word2]
# Bi-grammes possibles
candidates = np.array(list(transitions.keys()))
# Nombre d'occurrence des tri-grammes commençant par "word1", "word2"
weights = np.array(list(transitions.values()))
# Tirage aléatoire
next_word = np.random.choice(candidates,
p=weights/sum(weights))
return next_word
w1, w2 = "un", "homme"
sentence = [w1, w2]
while w2 != ".":
w = sample_next_word_triplets(w1, w2, triplet_transition_matrix)
w1 = w2
w2 = w
sentence.append(w2)
print(" ".join(sentence))