Travaux pratiques - Identification du chiffre parlé

Cette séance de travaux pratiques s’appuie, comme la précédente, sur la bibliothèque torchaudio. Nous ne ferons pas de rappel sur son utilisation. La documentation de torchaudio est accessible à l’adresse : https://pytorch.org/audio/stable/index.html

L’objectif est de reconnaître automatiquement le chiffre prononcé par un ou une locutrice. Pour ce faire, nous allons utiliser un sous-ensemble du jeu de données SpeechCommands introduit par Pete Warden de Google Brain pour l’entraînement des assistants vocaux. En l’occurrence, nous allons nous inspirer du célèbre jeu de données MNIST, qui consiste à reconnaître les chiffres manuscrits de 0 et à 9 en ne considérant qu’une partie de SpeechCommands.

Nous appellerons ce mini-jeu de données AudioMNIST pour la suite de ce TP.

# Imports des bibliothèques utiles pour la suite du TP
import matplotlib.pyplot as plt
import numpy as np
import os

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils import data

import torchaudio
from torchaudio import transforms

from tqdm.notebook import tqdm, trange

Les deux commandes ci-dessous permettre de télécharger et décompresser l’archive contenant les échantillons sonores d’AudioMNIST :

# exécuter dans un terminal
wget https://cedric.cnam.fr/vertigo/Cours/RCP217/data/audio-mnist.tar.gz
tar xzf audio-mnist.tar.gz

Nous pouvons commencer par écouter un des sons au hasard du jeu de données et visualiser sa forme d’onde ainsi que son spectrogramme.

audio, fs = torchaudio.load("./audio-mnist/data/7/baeac2ba_nohash_0.wav.ogg")
import IPython
IPython.display.Audio("./audio-mnist/data/7/baeac2ba_nohash_0.wav.ogg")
plt.plot(audio.numpy()[0]) and plt.show()

Il n’est pas nécessaire de charger un fichier sur le disque pour pouvoir le lire dans Jupyter. La fonction IPython.display.Audio accepte également les matrices NumPy : il faut toutefois alors préciser la fréquence d’échantillonnage dans le paramètre rate=.

IPython.display.Audio(audio.numpy(), rate=16000)

Question

À l’aide des fonctions de torchaudio.transforms, écrire une fonction change_volume qui modifie le gain (volume) d’un son (représenté par sa forme d’onde) de façon aléatoire par une valeur de gain comprise entre 0.5 et 1.5. On pourra utiliser torch.rand pour générer un nombre aléatoire entre 0 et 1.

Correction

def change_volume(audio):
    """
        Modifie le volume d'une forme d'onde en multipliant tous les échantillons
        par un gain G tiré aléatoirement entre 0.5 et 1.5.

        Arguments:
        ---------
        - audio (torch.Tensor): forme d'onde (CxL)

        Renvoie:
        --------
        - torch.Tensor (CxL) modifié
    """
    gain = np.random.rand() + 0.5
    t = transforms.Vol(gain)
    audio = t(audio)
    return audio

Question

Visualiser la forme d’onde et le spectogramme d’un son avant et après augmentation aléatoire du volume. Que constatez-vous ?

Correction

audio_transforme = change_volume(audio)
plt.plot(audio[0].numpy(), label="Original")
plt.plot(audio_transforme[0].numpy(), label="Après transformation")
plt.legend()
plt.show()

Par défault, la transformation sur le volume multiplie l’amplitude de la forme d’onde par la valeur du gain.

Afin d’implémenter une augmentation de données réaliste, nous n’allons pas utiliser seulement du bruit synthétique (bruit blanc ou bruit rose) mais aussi du bruit réel : bruit de pas, de vaisselle, etc.

Le sous-répertoire _background_noise contient 6 fichiers contenant du bruit (4 extraits de bruits réels et 2 bruits synthétiques) que nous allons pouvoir utiliser pour l’augmentation de données.

IPython.display.Audio("./audio-mnist/_background_noise_/doing_the_dishes.wav.ogg")

La fonction extract_random_window ci-dessous extraie aléatoirement 1 seconde de son d’un fichier passé en argument. Cela permet par exemple d’extraire une seconde d’un fichier contenant du bruit aléatoire, comme ceux du répertoire _background_noise identifiés ci-dessus.

def extract_random_window(path, duration=1):
    audio, fs = torchaudio.load(path)
    n_sample = int(duration * fs)
    begin = np.random.randint(0, audio.size(1) - n_sample)
    end = begin + n_sample
    return audio[:, begin:end]

extrait = extract_random_window("./audio-mnist/_background_noise_/dude_miaowing.wav.ogg")
print(extrait.size())
plt.plot(extrait[0].numpy()) and plt.show()

Question

Définir une fonction mix_with_noise qui prend un son en argument (représenté par sa forme d’onde) et lui ajoute un échantillon de bruit aléatoire de l’un des six fichiers de bruit :

  • doing_the_dishes.wav.ogg

  • exercise_bike.wav.ogg

  • running_tap.wav.ogg

  • dude_miaowing.wav.ogg

  • white_noise.wav.ogg

  • pink_noise.wav.ogg

Utilisez la fonction extract_random_window introduite précédemment.

Écouter et vérifier que le son produit est toujours compréhensible.

Correction

def mix_with_noise(audio):
   noisefile = np.random.choice(["doing_the_dises.wav.ogg",
                                 "dude_miaowing.wav.ogg",
                                 "exercise_bike.wav.ogg",
                                 "pink_noise.wav.ogg",
                                 "running_tap.wav.ogg",
                                 "white_noise.wav.ogg"], 1)[0]
   bruit = extract_random_window(noisefile, duration=1)
   intensite = np.random.rand() / 2 # entre 0 et 0.5
   return audio + intensite * bruit

audio_bruit = mix_with_noise(audio)
IPython.display.Audio(audio_bruit[0].numpy(), rate=16000)

Note: dans notre cas, tous les fichiers ont la même fréquence d’échantillonnage. Si ce n’était pas le cas, il faudrait être prudent et rééchantillonner le bruit à la même fréquence que l’audio considéré.

Le classe suivante est quasiment identique à celle utilisée dans le TP précédent pour manipuler le jeu de données MiniGenres. Elle définit une classe Python qui hérite de la clases torch.utils.data.Dataset.

from sklearn.preprocessing import LabelEncoder

class AudioMNIST(data.Dataset):
    """
        Cette classe implémente un jeu de données audio au format PyTorch.

        On suppose que le dossier contenant le jeu de données possède la
        structure suivante:
        dataset
        ├── classe1
        │   ├── classe1_fichier1.wav
        │   ├── classe1_fichier2.wav
        │   └── ...
        ├── classe2
        │   ├── classe2_fichier1.wav
        │   └── ...
        ├── ...
        │   ├── ...
        │   └── ...
        └── classeN
            ├── classeN_fichier1.wav
            ├── ...
            └── classeN_fichierK.wav

        La détection des fichiers répartis dans les N classes est automatique.
    """
    def __init__(self, path="./audio-mnist/data/", sample_rate=16000):
        """
            Créé un objet AudioMNIST

            Arguments
            ---------
            - path (str) : chemin vers le dossier contenant le jeu de données
                           (par défaut: ./audio_mnist)
            - sample_rate (int) : fréquence d'échantillonnage (par défaut: 16000 Hz)
        """
        super(AudioMNIST).__init__()
        filenames, labels = [], []
        if not os.path.isdir(path):
            raise ValueError(f"Le répertoire {path} est incorrect.")
        for dirname, _, files in os.walk(path):
            if dirname == path:
                continue
            filenames += sorted([os.path.join(dirname, f) for f in files])
            labels += [os.path.basename(dirname)] * len(files)
        self.label_encoder_ = LabelEncoder()
        self.filenames = filenames
        self.labels = self.label_encoder_.fit_transform(labels)
        self.classes = self.label_encoder_.classes_
        self.sample_rate = sample_rate

    def __len__(self):
        """ Renvoie la longueur du jeu de données (nombre d'échantillons sonores) """
        return len(self.filenames)

    def __getitem__(self, idx):
        """ Accède à un élément du jeu de données (son + étiquette) """
        filename, label = self.filenames[idx], self.labels[idx]
        data, sample_rate = torchaudio.load(filename)
        audio = torch.zeros((1, 16000))
        audio[:, :data.size(1)] = data
        return audio, label

Question

À quoi servent les trois dernières lignes de la méthode __getitem__ de la classe AudioMNIST ?

Correction

Ces trois lignes permettent de réaliser le zero-padding (bourrage de zéros) de sorte à ce que tous les sons aient la même durée (en l’occurrence, 1 seconde, c’est-à-dire 16000 échantillons pour une fréquence de 16kHz).

Comme dans le TP précédent, nous pouvons dorénavant créer un jeu de données au format PyTorch contenant les extraits sonores étiquetés d’AudioMNIST en construisant un objet à partir de la classe éponyme :

ds = AudioMNIST()
classes = ds.label_encoder_.classes_
idx = 14
audio, label = ds[idx]
print(f"Échantillon n°{idx} : X de dimensions {tuple(audio.size())}, y = {label} (classe {classes[label]})")

Notez qu’il est possible de lire l’audio avec Jupyter directement depuis la forme d’onde (il faut toutefois spécifier la fréquence d’échantillonnage à l’aide du paramètre rate=).

IPython.display.Audio(audio, rate=16000)

Nous allons diviser le jeu de données en 2 : 50% pour l’apprentissage, 50% pour le test. Si jamais l’entraînement est trop long sur votre machine, réduisez la proportion de données dédiées à l’apprentissage.

train_ds, test_ds = data.random_split(ds, (len(ds) // 2, len(ds) - len(ds) // 2))

CNN 2D sur le spectrogramme

Pour ce TP, nous allons réaliser la classification en utilisant un CNN 2D sur le spectrogramme. Le modèle de CNN que nous allons utiliser est simple : il s’agit d’une variante de LeNet-5 (5 couches convolutives et 2 couches entièrement connectées).

Question

Compléter la fonction get_cnn2d() pour qu’elle renvoie un CNN bidimensionnel respectant l’architecture suivante :

  • convolution 2D à 16 canaux 3x3,

  • convolution 2D à 32 canaux 3x3,

  • convolution 2D à 64 canaux 3x3,

  • convolution 2D à 64 canaux 3x3,

  • convolution 2D à 64 canaux 3x3,

  • global average pooling,

  • couche entièrement connectée 64 -> 1024,

  • Dropout (\(p=0.5\))

  • couche entièrement connectée 1024 -> 1024,

  • Dropout (\(p=0.5\))

  • couche entièrement connectée 1024 -> num_classes.

On utilisera la fonction d’activation ReLU après chaque couche où c’est nécessaire et un max pooling 2D sur une fenêtre 2x2 après chaque convolution.

On utilisera à bon escient les couches AdaptiveAvgPool2d et Flatten pour réaliser l’échantillonnage global.

Correction

def get_cnn2d(num_classes):
    net = nn.Sequential(
        nn.Conv2d(1, 16, kernel_size=3),
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.ReLU(),
        nn.Conv2d(16, 32, kernel_size=3),
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.ReLU(),
        nn.Conv2d(32, 64, kernel_size=3),
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.ReLU(),
        nn.Conv2d(64, 64, kernel_size=3),
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.ReLU(),
        nn.Conv2d(64, 64, kernel_size=3),
        nn.ReLU(),
        nn.AdaptiveAvgPool2d(output_size=(1,1)),
        nn.Flatten(),
        nn.Linear(64, 1024),
        nn.Dropout(),
        nn.ReLU(),
        nn.Linear(1024, 1024),
        nn.Dropout(),
        nn.ReLU(),
        nn.Linear(1024, num_classes)
    )
    return net
def train(model, train_ds, epochs, lr=0.001, bs=16, device="cpu"):
    # Déplace le modèle sur GPU si besoin
    model = model.to(device)
    # Optimisation : descente de gradient avec moment
    optimizer = optim.Adam(model.parameters(), lr=lr)
    # Transformation forme d'onde -> spectrogramme
    spectrogram = transforms.Spectrogram(n_fft=400)
    # Créé le DataLoader
    train_data = data.DataLoader(train_ds, batch_size=bs, shuffle=True, num_workers=1)

    for e in trange(1, epochs+1, desc="Apprentissage"):
        # On met le modèle en mode `train`
        model = model.train()
        average = 0.

        for data_, labels in tqdm(train_data, desc=f"Epoch {e}"): # charge les données
            data_, labels = data_.to(device), labels.to(device)
            optimizer.zero_grad() # réinitialise les gradients à 0
            data_ = spectrogram(data_) # calcul du spectrogramme
            output = model(data_) # calcul de sprédictions
            loss = F.cross_entropy(output, labels) # calcul de la loss
            average += loss.item() # calcul de l'erreur moyenne
            #tqdm.write(f"{loss.item()}")
            loss.backward() # rétroprogation
            optimizer.step() # descente de gradient
        tqdm.write(f"Erreur à l'epoch {e} = {average/len(train_data)}")

    return model

Question

Utilisez la fonction train ci-dessus pour entraîner le modèle de CNN 2D sur le jeu d’apprentissage de AudioMNIST pendant 20 epochs avec un learning rate de 0.001.

Note: si c’est trop long, diminuez la taille du jeu de données d’apprentissage en changeant les proportions du random_split plus haut.

# On construit le modèle
model = get_cnn2d(len(ds.classes))
# 20 époques suffisent pour notre problème
epochs = 20
model = train(model, train_ds, epochs)

Comme dans le TP précédent, la fonction eval permet de calculer les métriques sur le jeu de test (matrice de confusion et accuracy).

from sklearn.metrics import accuracy_score, confusion_matrix

def eval(model, test_ds, bs=64):
    model = model.eval()
    test_loader = data.DataLoader(test_ds, batch_size=bs, num_workers=1)
    spectrogram = transforms.Spectrogram(n_fft=400)
    y_true, y_pred = [], []
    for batch, labels in tqdm(test_loader, desc="Évaluation"):
        with torch.no_grad():
            batch = spectrogram(batch)
            outputs = model(batch)
        predictions = torch.argmax(outputs, dim=1)
        y_pred.append(predictions.to("cpu").numpy())
        y_true.append(labels.to("cpu").numpy())
    y_pred = np.concatenate(y_pred)
    y_true = np.concatenate(y_true)
    acc = accuracy_score(y_true, y_pred)
    cm = confusion_matrix(y_true, y_pred)
    return {"y_pred": y_pred, "y_true": y_true, "accuracy": acc, "cm": cm}
results = eval(model, test_ds)
plt.matshow(results["cm"])
print(results["accuracy"])

Visualisation des caractéristiques apprises

Maintenant que notre modèle est entraîné, nous allons chercher à visualiser les caractéristiques apprises afin de vérifier ques les différents chiffres sont bien séparés dans l’espace de représentation.

Pour ce faire, nous commençons par construire un modèle qui ne réalise que l’extraction des caractéristiques et pas la classification.

feat_extractor = nn.Sequential(*list(model.children())[:16])

Question

À quoi correspond list(model.children()) (essayer dans une nouvelle cellule) ? Pourquoi ne gardons nous que les 16 premiers éléments ?

Correction

.children() correspond aux couches du réseau de neurones défini avec l’interface séquentielle. On ne conserve ici que les 16 premières couches, c’est-à-dire celles qui correspondent aux couches convolutives.

Nous allons boucler sur les échantillons pour convertir les spectrogrammes en feature maps du CNN2D.

spectrogram = transforms.Spectrogram()
X, y = [], []
for audio, l in tqdm(test_ds):
    with torch.no_grad():
        X.append(feat_extractor(spectrogram(audio).unsqueeze(0)).numpy()[0])
    y.append(l)
X, y = np.array(X), np.array(y)

Question

Utilisez l’algorithme de réduction de dimension t-SNE pour projeter les caractéristiques extraites dans un plan à deux dimensions. Vous pouvez à cette fin avoir besoin de la documentation de t-SNE dans scikit-learn. Visualisez les caractéristiques projetées en les colorant par leurs étiquettes réelles.

Que constatez-vous ?

Correction

from sklearn.manifold import TSNE
dr = TSNE(n_components=2)
proj = dr.fit_transform(X)
fig, ax = plt.subplots(figsize=(12, 12))
scatter = plt.scatter(proj[:,0], proj[:,1], c=y, cmap=plt.cm.Set3, label=y)
legend1 = ax.legend(*scatter.legend_elements(),
                      loc="lower left", title="Classes")
ax.add_artist(legend1)
plt.show()

Le modèle sépare raisonnablement bien les différentes classes. Il y a chevauchement entre les classes aux endroits correspondant aux erreurs indiquées dans la matrice de confusion.

Question

Refaire l’entraînement du modèle mais en appliquant cette fois-ci l’augmentation de données consistant à mélanger le chiffre parlé avec un bruit de fond aléatoire. Vous pouvez appliquer l’augmentation de données dans l’objet AudioMNIST ou bien dans la fonction train directement.

Calculez à nouveau les métriques (accuracy et matrice de confusion) et faites une nouvelle visualisation t-SNE des caractéristiques apprises par ce deuxième modèle.

Que constatez-vous ?

Pour aller plus loin : séparation de sources avec ConvTasNet

Cette partie optionnelle montre l’utilisation en pratique d’un modèle de séparation de sources tel quel ConvTasNet. Nous ne nous intéresserons pas à l’entraînement d’un tel modèle (qui constituerait un projet en soi) mais simplement à illustrer son usage sur des mélanges synthétiques.

Tout d’abord il sera nécessaire d’installer deux dépendances supplémentaires afin d’utiliser un modèle ConvTasNet préentraîné. Afin de tout devoir refaire par nous-mêmes, nous allons utiliser la librairie Asteroid développée par Manuel Parente, chercheur à l’INRIA.

# dans un terminal
pip install soundfile asteroid

La commande suivante de la bibliothèque Asteroid permet de télécharger et de charger automatiquement en mémoire un modèle ConvTasNet préentraîné pour la séparation de 2 sources sur le jeu de données LibriMix (se référer à la documentation d’Asteroid pour plus de détails). Le modèle précis que nous allons utiliser a été entraîné par Joris Cosentino, vous pouvez trouver les détails ici.

from asteroid.models import ConvTasNet
tasnet = ConvTasNet.from_pretrained('Cosentino/ConvTasNet_LibriMix_sep_clean')

Dans un premier temps, essayons de voir ce qui se passe sur nous donnons un de nos chiffres parlés au modèle. Nous savons qu’il n’y a qu’une seule source, le modèle devrait donc produire une source parfaitement silencieuse et une source identique à l’entrée.

Attention, ConvTasNet a été entraîné sur des données échantillonnés à 8kHz et nos sons sont échantillonnés à 16kHz, il faut donc que nous les rééchantillonions.

Nous pouvons écouter le son initial :

idx = 4446
audio, _ = ds[idx] # charge l'élément idx du dataset
IPython.display.Audio(audio.numpy()[0], rate=16000)

Puis nous le rééchantillonnons avant de réaliser la séparation de sources avec ConvTasNet.

resample = transforms.Resample(16000, 8000)
audio = resample(audio)

with torch.no_grad(): # inférence (pas de calcul du gradient)
    # unsqueeze(0) permet d'ajouter une dimension singleton pour la dimension du batch
    s1, s2 = tasnet(audio.unsqueeze(0)).numpy()[0]
plt.plot(s1, alpha=0.6, label="Source 1")
plt.plot(s2, alpha=0.6, label="Source 2")
plt.legend()
plt.show()

Nous pouvons écouter la source 1 :

IPython.display.Audio(s1, rate=8000)

Et la source 2 :

IPython.display.Audio(s2, rate=8000)

Question

Que constatez-vous ?

Correction

Le modèle produit une source silencieuse (ou presque) et une source quasiment identique au chiffre parlé. C’est ce que l’on souhaite.

Question

Chargez deux formes d’onde au hasard dans ds et additionnez-les. Il est préférable de choisir des chiffres différents prononcés par des voix assez distinctes. Écoutez le son produit.

idx1, idx2 = 25, 10024
audio1, label1 = ds[idx1]
audio2, label2 = ds[idx2]
print(f"Échantillon n°{idx1} : X de dimensions {tuple(audio1.size())}, y = {label1}")
print(f"Échantillon n°{idx2} : X de dimensions {tuple(audio2.size())}, y = {label2}")
somme = audio1 + audio2
IPython.display.Audio(somme.numpy()[0], rate=16000)

Question

Appliquez le modèle ConvTasNet préentraîné sur la forme d’onde mélangée (contenant la somme des deux chiffres prononcés). Écoutez les deux sons produits par le modèle. Attention: n’oubliez pas que ConvTasNet a été entraîné sur des formes d’ondes échantillonnées à 8000 Hz.

Correction

resample = transforms.Resample(16000, 8000)
somme = resample(somme)
with torch.no_grad():
    s1, s2 = tasnet(somme.unsqueeze(0)).numpy()[0]
IPython.display.Audio(s1, rate=8000)
IPython.display.Audio(s2, rate=8000)