Travaux pratiques - Echantillonnage. Analyse en composantes principales

(une variante de ce TP utilisant le langage Scala)

Références externes utiles :

L’objectif de cette séance de TP est de présenter les fonctionnalités proposées par Spark pour l’échantillonnage des données, ainsi que l’utilisation de l’analyse en composantes principales (ACP) dans Spark.

Les exemples ci-dessous reprennent, en partie, ceux des documentations en ligne.

Échantillonnage

Lecture des données

Nous travaillerons lors de cette séance sur des données Spambase Data Set issues de l’archive de l’UCI. Il s’agit d’une collection de courriels étiquetés comme spam (indésirable) ou normal. Vous trouverez sur le site de l’UCI des explications plus détaillées concernant ces données, regardez-les, notamment la signification des variables et les classes auxquelles appartiennent les données.

Pour préparer les répertoires et télécharger les données, ouvrez une fenêtre terminal et entrez les commandes suivantes :

%%bash
mkdir -p tpacp/data
wget -nc https://archive.ics.uci.edu/ml/machine-learning-databases/spambase/spambase.data -P tpacp/data/
pyspark

Sous Windows, créez le répertoire tpacp et le sous-répertoire data, téléchargez le fichier de données dans le sous-répertoire data, ouvrez une fenêtre invite de commandes et lancez pyspark.

Dans le fichier spambase.data, chaque ligne contient un vecteur de 57 valeurs float et ensuite une étiquette de classe (0 ou 1). Nous allons maintenant importer ces données dans Spark.

from pyspark.sql.types import DoubleType, StructType, StructField

# Génération des StructField correspondant aux 57 premières colonnes
spamFields = [StructField("val"+str(i), DoubleType(), True) for i in range(0, 57)]

# Construction du schéma complet du Dataframe (incluant la dernière colonne)
spamSchema = StructType(spamFields).add("label", DoubleType(), True)

# Lecture des données et création du Dataframe
spamDF = spark.read.format("csv").schema(spamSchema).load("tpacp/data/spambase.data").cache()
spamDF.printSchema()    # vérification

Nous nous servirons du DataFrame spamDF dans la suite de cette séance.

Question :

Expliquez la construction de spamSchema.

Correction :

Un schéma StructType de DataFrame est une succession de StructField. Dans les données de spambase.data, les 57 premières colonnes sont des float et la 58ème est un int correspondant à l’étiquette de classe (0 signifie non spam, 1 signifie spam). Une compréhension de liste produit ensuite une séquence de 57 StructField avec des noms "val...", de type DoubleType et dont des valeurs peuvent être absentes (signification de True). Cette séquence est convertie en StructType et on lui ajoute à la fin le StructField correspondant à la dernière colonne qui est l’étiquette de classe (le type DoubleType est conservé pour des considérations pratiques).

Échantillonnage simple

Nous allons d’abord générer deux DataFrame, spamEch1 et spamEch2, qui sont des échantillons du DataFrame spamDF (réduit aux 3 premières colonnes) avec deux valeurs très différentes pour le taux de sélection (paramètre fraction dans .sample()) :

# Combien de lignes de chaque classe contient spamDF (attention opération coûteuse)
spamDF.groupBy("label").count().show()
# Définition d'échantillons (échantillonnage simple)
# On ne garde que les colonnes "val0", "val1" et "label"
spamEch1 = spamDF.select("val0", "val1", "label").sample(False, 0.5)
print(spamEch1.count())
spamEch2 = spamDF.select("val0", "val1", "label").sample(False, 0.1)
print(spamEch2.count())
# Opérations possibles aussi mais coûteuses
spamEch1.groupBy("label").count().show()
spamEch2.groupBy("label").count().show()

La réduction du DataFrame à 3 colonnes (avec .select("val0", "val1", "label")) est simplement effectuée pour améliorer la lisibilité de certaines des opérations suivantes.

Question :

Que pouvez-vous constater en comparant le nombre de lignes de spamEch1 ou spamEch2 et celui de spamDF multiplié par la valeur de fraction correspondante ? Refaites l’échantillonnage (avec les mêmes valeurs de taux de sélection) et comparez. Expliquez.

Correction :

Il y a un écart important entre 4601 * 0.5 et le nombre de lignes de spamEch1, ainsi qu’entre 4601 * 0.1 et le nombre de lignes de spamEch2. De plus, si on refait l’échantillonnage on n’obtient pas nécessairement le même nombre de lignes. La valeur de fraction représente la probabilité de retenir une ligne du DataFrame de départ dans l’échantillon (même valeur valable pour toutes les lignes) et non un « taux de sélection » dans le sens nb_lignes_DataFrame_de_départ * taux_de_sélection = nb_lignes_DataFrame_échantillon.

Examinons maintenant des statistiques simples calculées sur les colonnes des 3 DataFrame :

summary  = spamDF.select("val0", "val1", "label").describe().show()
summary1 = spamEch1.describe().show()
summary2 = spamEch2.describe().show()

Question :

Quel constat faites-vous en comparant les moyennes calculées pour une même variable (colonne) sur spamDF.select("val0", "val1", "label") et sur les deux échantillons ? Expliquez.

Correction :

Les estimations faites à partir d’un échantillon sont (naturellement) différentes des valeurs obtenues sur la totalité des données et l’erreur d’estimation augmente en général avec la réduction de la taille de l’échantillon.

Échantillonnage stratifié

L’échantillonnage stratifié permet de fixer le taux d’échantillonnage pour chaque strate. Pour un DataFrame, les strates sont définis par les valeurs uniques d’une des colonnes (une valeur définit un strate). La méthode .sampleBy() de DataFrameStatFunctions (voir la documentation API Spark en Python) respecte de façon approximative les taux d’échantillonnage indiqués. Il est nécessaire d’indiquer la valeur d’initialisation (seed) du générateur aléatoire (ci-dessous 1). Un nouvel appel à la même fonction avec la même valeur de seed permettrait d’obtenir exactement le même échantillon. En règle générale, il est conseillé d’utiliser des valeurs aléatoires élevées pour seed et non de valeur faible fréquemment employée (comme 1 ici).

# Définition des fractions par strate
fractions = {0.0: 0.5, 1.0: 0.5}    # dictionnaire

# Échantillonnage stratifié (approximatif)
spamEchStratifie = spamDF.select("val0", "val1", "label").stat.sampleBy("label", fractions, 1)
spamEchStratifie.groupBy("label").count().show()

Question :

Refaites l’échantillonnage avec une autre valeur de seed et comparez les résultats. Que constatez-vous ?

Correction :

Avec une autre valeur, les effectifs des strates dans l’échantillon sont assez sensiblement différents.

Analyse en composantes principales (ACP)

MLlib donne la possibilité de calculer directement les k premières composantes principales pour un ensemble d’observations. Les N observations sont représentées par une ou plusieurs colonnes d’un DataFrame dont le nombre de lignes est égal au nombre d’observations (N). Le DataFrame peut contenir une colonne de type Vector avec autant de composantes qu’il y a de variables soumises à l’ACP (nombre noté ici par n), ou alors plusieurs colonnes qu’il faut regrouper au préalable pour obtenir une colonne de type Vector.

Les k premières composantes principales sont les vecteurs propres d’une ACP centrée mais non normée (les variables sont transformées pour être de moyenne nulle mais la variance n’est pas modifiée pour devenir égale à 1) appliquée aux observations. Les valeurs propres sont retournées triées par ordre décroissant et exprimées en proportion de variance expliquée dans un DenseVector accessible par PCAModel.explainedVariance. Les composantes principales (vecteurs propres unitaires associés aux valeurs propres) sont retournées dans une matrice locale DenseMatrix de n lignes (le nombre de variables initiales) et k colonnes (le nombre de composantes principales demandées) accessible par PCAModel.pc. Les composantes principales sont triées en ordre décroissant des valeurs propres correspondantes.

L’ACP est réalisée en définissant une instance de PCA et en appelant la méthode .fit() pour cette instance. Le résultat est un PCAModel. La projection des données sur les k premières composantes principales est obtenue en appelant la méthode .transform() pour ce PCAModel.

Dans la suite, nous appliquerons l’ACP sur les données de Spambase Data Set, ainsi que sur un échantillon de ces données et sur des données aléatoires générées suivant une loi normale multidimensionnelle isotrope (c’est à dire de même variance dans toutes les directions). Nous visualiserons les résultats.

Réaliser l’ACP

ACP sur le Spambase Data Set

La première opération consiste à regrouper les colonnes à analyser du DataFrame spamDF dans une nouvelle colonne de type Vector en utilisant le VectorAssembler :

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

# Construction du VectorAssembler
colsEntree = ["val"+str(i) for i in range(0, 57)]
assembleur = VectorAssembler().setInputCols(colsEntree).setOutputCol("features")

# Construction du Dataframe spamDFA en appliquant le VectorAssembler
spamDFA = assembleur.transform(spamDF)
spamDFA.printSchema()
spamDFA.select("features").show(3)

La colonne features contient désormais un vecteur (type Vector) qui rassemble les 57 valeurs des colonnes initiales. Nous pouvons désormais appliquer l’ACP sur les données de spamDFA.select("features"). Par défaut, Spark applique une ACP centrée mais non normée :

from pyspark.ml.feature import PCA

# Construction et application d'une nouvelle instance d'estimateur PCA
pcaModel = PCA().setInputCol("features").setOutputCol("pcaFeatures") \
                .setK(3).fit(spamDFA)
# Syntaxe équivalente :
#pcaModel = PCA(k=3, inputCol="features", outputCol="pcaFeatures").fit(spamDFA)

# Application du « transformateur » PCAModel résultant de l'estimation
resultat = pcaModel.transform(spamDFA).select("pcaFeatures")

# Les 3 plus grandes valeurs propres (exprimées en proportion de variance expliquée)
print(pcaModel.explainedVariance)

# Vecteurs propres correspondants
print(pcaModel.pc)

Afin d’obtenir une ACP centrée et normée, il est nécessaire de normaliser les variables (les transformer pour que chaque variable soit de moyenne nulle et écart-type égal à 1) avant application de l’ACP. Pour cela, il est possible de faire appel à StandardScaler :

from pyspark.ml.feature import StandardScaler

# Construction d'une nouvelle instance d'estimateur StandardScaler
scaler = StandardScaler().setInputCol("features").setOutputCol("scaledFeatures") \
                         .setWithStd(True).setWithMean(True)
# Syntaxe équivalente :
#scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", \
#                         withStd=True, withMean=True)

# Application de la nouvelle instance d'estimateur StandardScaler
scalerModel = scaler.fit(spamDFA)

La variable scalerModel contient désormais le modèle de normalisation adapté au jeu de données spamDFA. Il est possible d’inspecter les statistiques qui ont été estimées sur le DataFrame :

# Examen des moyennes et des variances calculées dans le StandardScalerModel résultant
print(scalerModel.mean)
print(scalerModel.std)

Une fois le modèle obtenu, il peut être appliqué sur n’importe quel jeu de données afin de le normaliser. Dans notre cas, nous l’utilisons pour normaliser (centrage et réduction) les éléments du jeu de données spamDFA :

# Application du transformer StandardScalerModel résultant
scaledSpamDF = scalerModel.transform(spamDFA).select("scaledFeatures", "label")

# scaledSpamDF a une seule colonne de Vectors, nommée "scaledFeatures"
scaledSpamDF.printSchema()

Question :

Appliquez l’ACP sur le DataFrame scaledSpamDF et mettez les projections des données sur les trois premiers axes principaux dans un DataFrame resultatCR. Examinez les trois plus grandes valeurs propres et comparez-les à celles obtenues précédemment. Que constatez-vous ?

Correction

    # Construction et application d'une nouvelle instance d'estimateur PCA
    pcaCR = PCA().setInputCol("scaledFeatures").setOutputCol("pcaCRFeatures") \
                 .setK(3).fit(scaledSpamDF)

    # Application du « transformateur » PCAModel résultant de l'estimation
    resultatCR = pcaCR.transform(scaledSpamDF).select("pcaCRFeatures")

    # Les 3 plus grandes valeurs propres (exprimées en proportion de variance expliquée)
    print(pcaCR.explainedVariance)
    # à comparer avec
    print(pcaModel.explainedVariance)

Les écarts entre les 3 plus grandes valeurs propres sont plus faibles sur les données centrées et réduites que sur les données seulement centrées. Cela montre que les variables initiales avaient des variances très différentes (après centrage).

Question :

Appliquez l’ACP non normée sur un échantillon (appelé spamEch3) de 10% du DataFrame spamDF obtenu par échantillonnage simple et mettez les projections des données sur les trois premiers axes principaux dans un DataFrame resultatEch3. Examinez les trois plus grandes valeurs propres et comparez-les à celles obtenues dans l’ACP non normée de spamDF. Que constatez-vous ?

Correction

   spamEch3 = spamDFA.sample(False, 0.1)
   pcaEch3 = PCA().setInputCol("features").setOutputCol("pcaFeatures") \
                  .setK(3).fit(spamEch3)
   resultatEch3 = pcaEch3.transform(spamEch3).select("pcaFeatures")
   pcaEch3.explainedVariance

Les valeurs propres obtenues sur un échantillon d'environ 10% sont naturellement différentes de celles obtenues sur la totalité des données, les écarts **relatifs** étant toutefois plus faible pour les valeurs propres les plus grandes que pour les suivantes.

ACP sur des données aléatoires

À titre d’exemple, nous pouvons également appliquer l’ACP sur des données tridimensionnelles obtenues par tirage aléatoire suivant une loi normale isotrope :

from pyspark.sql.functions import randn
from pyspark.ml.feature import PCA

# Création d'un Dataframe d'identifiants entre 0 et 4600
idsDF = spark.range(4601)
idsDF.show(5)

# Création d'un Dataframe avec 3 colonnes de valeurs aléatoires
#  tirées d'une loi normale d'espérance nulle et écart-type 1
randnDF = idsDF.select("id", randn(seed=1).alias("normale0"), \
                             randn(seed=2).alias("normale1"), \
                             randn(seed=3).alias("normale2"))
# Vérification : moyennes ~ 0, écart-types ~ 1 pour colonnes normale...
randnDF.describe().show()

# Construction du VectorAssembler
colsEntRnd = ["normale"+str(i) for i in range(0, 3)]
assRnd = VectorAssembler().setInputCols(colsEntRnd).setOutputCol("features")

# Construction du Dataframe randnDF2 en appliquant le VectorAssembler
randnDF2 = assRnd.transform(randnDF)
randnDF2.printSchema()

# Application de l'ACP
pcaRnd = PCA().setInputCol("features").setOutputCol("pcaRndFeatures") \
              .setK(3).fit(randnDF2)
resRnd = pcaRnd.transform(randnDF2).select("pcaRndFeatures")
pcaRnd.explainedVariance

Question :

Examinez les trois premières valeurs propres et expliquez le résultat obtenu.

Correction :

Les valeurs propres obtenues sur les données aléatoires suivant une loi normale isotrope (de même variance suivant les trois variables initiales) sont pratiquement égales, le nuage de points est presque sphérique ; les vecteurs propres ne sont pas utiles dans ce cas car une faible perturbation dans l’échantillon aléatoire suivant la loi normale isotrope aura comme conséquence un changement potentiellement fort dans les vecteurs propres associés aux trois plus grandes valeurs propres.

Visualiser les résultats

Dans cette séance de travaux pratiques nous visualiserons les données à l’aide de la bibliothèque Matplotlib. D’autres options sont les bibliothèques Plotly et Seaborn.

Nous souhaitons désormais produire un nuage de points en deux dimensions représentant chaque courriel projeté sur les deux axes principaux de l’ACP.

Question

Réaliser une analyse en composantes principales centrée et réduite sur scaledSpamDF, mais en ne conservant cette fois-ci que les deux premiers axes principaux. Mettre le résultat dans une variable resultatCR_2D.

Correction

Il s’agit simplement d’appliquer sur les données centrées-réduites une ACP avec \(k = 2\).

pcaCR = PCA().setInputCol("scaledFeatures").setOutputCol("pcaCRFeatures") \
             .setK(2).fit(scaledSpamDF)

# Application du « transformateur » PCAModel résultant de l'estimation
resultatCR_2D = pcaCR.transform(scaledSpamDF).select("pcaCRFeatures")

Afin de réaliser la visualisation avec Matplotlib, une solution simple est de passer par des fonctionnalités de la bibliothèque Python pandas. Le DataFrame Spark contenant les données à afficher est d’abord converti à un dataframe pandas sur le nœud driver, ce qui implique que toutes les données du DataFrame situées sur les nœuds de calcul sont ramenées sur le nœud driver, comme pour une opération .collect(). Si les données sont très volumineuses, cette opération est très coûteuse et peut même provoquer un débordement mémoire sur le driver. Dans le cas de données volumineuses il faut envisager d’extraire un (bien plus petit) échantillon de données dans un nouveau DataFrame et de convertir ce dernier à un dataframe pandas. Noter que l’utilisation de Apache Arrow permet d’optimiser l’échange entre DataFrames Spark (distribués) et dataframes pandas (localisés sur le nœud driver), voir cette description.

from pyspark.ml.functions import vector_to_array
from pyspark.sql.functions import col
import pandas as pd

# Extraction de deux colonnes à partir de l'unique colonne de vecteurs 2D
#  et création de dataframe pandas à partir du DataFrame Spark
donneesAafficher = resultatCR_2D.withColumn("x", vector_to_array("pcaCRFeatures")) \
                                .select([col("x")[i] for i in range(2)]).toPandas()
donneesAafficher.head(5)

# Visualisation de type scatter plot
import matplotlib.pyplot as plt
donneesAafficher.plot.scatter('x[0]', 'x[1]')

Question

Réaliser la même visualisation dans le cas des données aléatoires. Que constatez-vous ?

Correction :

# Les projections sur les trois composantes sont extraites des vecteurs denses
donneesRndAafficher = resRnd.withColumn("x", vector_to_array("pcaRndFeatures")) \
                            .select([col("x")[i] for i in range(3)]).toPandas()
donneesRndAafficher.head(5)
# mais seules les projections sur deux composantes sont affichées
donneesRndAafficher.plot.scatter('x[0]', 'x[1]')

Dans le cas des données aléatoires, l’analyse en composantes principales ne permet pas de déterminer de direction principale car la distribution des données est isotrope (même variance dans toutes les directions). Par conséquent, la visualisation obtenue est simplement une boule bidimensionnelle.

Question

Pour terminer, effectuez la visualisation sur les données Spambase Data Set mais en ajoutant une couleur différente en fonction de l’étiquette du point considéré.

Correction

Pour visualiser les étiquettes, il faut conserver la colonne label après l’ACP :

resultatCR_2D = pcaCR.transform(scaledSpamDF).select("pcaCRFeatures", "label")

On peut ensuite ajouter l’option à la visualisation précédente.

donneesAafficher = resultatCR_2D.withColumn("x", vector_to_array("pcaCRFeatures")) \
                                .select(["label"] + [col("x")[i] for i in range(2)]) \
                                .toPandas()

# Visualisation de type scatter plot avec une couleur par étiquette
donneesAafficher.plot.scatter('x[0]', 'x[1]', c='label', cmap='coolwarm')