Travaux pratiques - Echantillonnage. Analyse en composantes principales¶
(une variante de ce TP utilisant le langage Scala)
Références externes utiles :
L’objectif de cette séance de TP est de présenter les fonctionnalités proposées par Spark pour l’échantillonnage des données, ainsi que l’utilisation de l’analyse en composantes principales (ACP) dans Spark.
Les exemples ci-dessous reprennent, en partie, ceux des documentations en ligne.
Échantillonnage¶
Lecture des données¶
Nous travaillerons lors de cette séance sur des données Spambase Data Set issues de l’archive de l’UCI. Il s’agit d’une collection de courriels étiquetés comme spam (indésirable) ou normal. Vous trouverez sur le site de l’UCI des explications plus détaillées concernant ces données, regardez-les, notamment la signification des variables et les classes auxquelles appartiennent les données.
Pour préparer les répertoires et télécharger les données, ouvrez une fenêtre terminal et entrez les commandes suivantes :
%%bash
mkdir -p tpacp/data
wget -nc https://archive.ics.uci.edu/ml/machine-learning-databases/spambase/spambase.data -P tpacp/data/
pyspark
Sous Windows, créez le répertoire tpacp
et le sous-répertoire data
, téléchargez le fichier de données dans le sous-répertoire data
, ouvrez une fenêtre invite de commandes et lancez pyspark
.
Dans le fichier spambase.data
, chaque ligne contient un vecteur de 57 valeurs float
et ensuite une étiquette de classe (0 ou 1). Nous allons maintenant importer ces données dans Spark.
from pyspark.sql.types import DoubleType, StructType, StructField
# Génération des StructField correspondant aux 57 premières colonnes
spamFields = [StructField("val"+str(i), DoubleType(), True) for i in range(0, 57)]
# Construction du schéma complet du Dataframe (incluant la dernière colonne)
spamSchema = StructType(spamFields).add("label", DoubleType(), True)
# Lecture des données et création du Dataframe
spamDF = spark.read.format("csv").schema(spamSchema).load("tpacp/data/spambase.data").cache()
spamDF.printSchema() # vérification
Nous nous servirons du DataFrame spamDF
dans la suite de cette séance.
Question :
Expliquez la construction de spamSchema
.
Échantillonnage simple¶
Nous allons d’abord générer deux DataFrame, spamEch1
et spamEch2
, qui sont des échantillons du DataFrame spamDF
(réduit aux 3 premières colonnes) avec deux valeurs très différentes pour le taux de sélection (paramètre fraction
dans .sample()
) :
# Combien de lignes de chaque classe contient spamDF (attention opération coûteuse)
spamDF.groupBy("label").count().show()
# Définition d'échantillons (échantillonnage simple)
# On ne garde que les colonnes "val0", "val1" et "label"
spamEch1 = spamDF.select("val0", "val1", "label").sample(False, 0.5)
print(spamEch1.count())
spamEch2 = spamDF.select("val0", "val1", "label").sample(False, 0.1)
print(spamEch2.count())
# Opérations possibles aussi mais coûteuses
spamEch1.groupBy("label").count().show()
spamEch2.groupBy("label").count().show()
La réduction du DataFrame à 3 colonnes (avec .select("val0", "val1", "label")
) est simplement effectuée pour améliorer la lisibilité de certaines des opérations suivantes.
Question :
Que pouvez-vous constater en comparant le nombre de lignes de spamEch1
ou spamEch2
et celui de spamDF
multiplié par la valeur de fraction
correspondante ? Refaites l’échantillonnage (avec les mêmes valeurs de taux de sélection) et comparez. Expliquez.
Examinons maintenant des statistiques simples calculées sur les colonnes des 3 DataFrame :
summary = spamDF.select("val0", "val1", "label").describe().show()
summary1 = spamEch1.describe().show()
summary2 = spamEch2.describe().show()
Question :
Quel constat faites-vous en comparant les moyennes calculées pour une même variable (colonne) sur spamDF.select("val0", "val1", "label")
et sur les deux échantillons ? Expliquez.
Échantillonnage stratifié¶
L’échantillonnage stratifié permet de fixer le taux d’échantillonnage pour chaque strate. Pour un DataFrame, les strates sont définis par les valeurs uniques d’une des colonnes (une valeur définit un strate). La méthode .sampleBy()
de DataFrameStatFunctions
(voir la documentation API Spark en Python) respecte de façon approximative les taux d’échantillonnage indiqués.
Il est nécessaire d’indiquer la valeur d’initialisation (seed) du générateur aléatoire (ci-dessous 1
). Un nouvel appel à la même fonction avec la même valeur de seed permettrait d’obtenir exactement le même échantillon. En règle générale, il est conseillé d’utiliser des valeurs aléatoires élevées pour seed et non de valeur faible fréquemment employée (comme 1 ici).
# Définition des fractions par strate
fractions = {0.0: 0.5, 1.0: 0.5} # dictionnaire
# Échantillonnage stratifié (approximatif)
spamEchStratifie = spamDF.select("val0", "val1", "label").stat.sampleBy("label", fractions, 1)
spamEchStratifie.groupBy("label").count().show()
Question :
Refaites l’échantillonnage avec une autre valeur de seed et comparez les résultats. Que constatez-vous ?
Analyse en composantes principales (ACP)¶
MLlib donne la possibilité de calculer directement les k premières composantes principales pour un ensemble d’observations. Les N observations sont représentées par une ou plusieurs colonnes d’un DataFrame dont le nombre de lignes est égal au nombre d’observations (N). Le DataFrame peut contenir une colonne de type Vector
avec autant de composantes qu’il y a de variables soumises à l’ACP (nombre noté ici par n), ou alors plusieurs colonnes qu’il faut regrouper au préalable pour obtenir une colonne de type Vector
.
Les k premières composantes principales sont les vecteurs propres d’une ACP centrée mais non normée (les variables sont transformées pour être de moyenne nulle mais la variance n’est pas modifiée pour devenir égale à 1) appliquée aux observations. Les valeurs propres sont retournées triées par ordre décroissant et exprimées en proportion de variance expliquée dans un DenseVector
accessible par PCAModel.explainedVariance
. Les composantes principales (vecteurs propres unitaires associés aux valeurs propres) sont retournées dans une matrice locale DenseMatrix
de n lignes (le nombre de variables initiales) et k colonnes (le nombre de composantes principales demandées) accessible par PCAModel.pc
. Les composantes principales sont triées en ordre décroissant des valeurs propres correspondantes.
L’ACP est réalisée en définissant une instance de PCA et en appelant la méthode .fit()
pour cette instance. Le résultat est un PCAModel. La projection des données sur les k premières composantes principales est obtenue en appelant la méthode .transform()
pour ce PCAModel.
Dans la suite, nous appliquerons l’ACP sur les données de Spambase Data Set, ainsi que sur un échantillon de ces données et sur des données aléatoires générées suivant une loi normale multidimensionnelle isotrope (c’est à dire de même variance dans toutes les directions). Nous visualiserons les résultats.
Réaliser l’ACP¶
ACP sur le Spambase Data Set¶
La première opération consiste à regrouper les colonnes à analyser du DataFrame spamDF
dans une nouvelle colonne de type Vector
en utilisant le VectorAssembler
:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
# Construction du VectorAssembler
colsEntree = ["val"+str(i) for i in range(0, 57)]
assembleur = VectorAssembler().setInputCols(colsEntree).setOutputCol("features")
# Construction du Dataframe spamDFA en appliquant le VectorAssembler
spamDFA = assembleur.transform(spamDF)
spamDFA.printSchema()
spamDFA.select("features").show(3)
La colonne features contient désormais un vecteur (type Vector
) qui rassemble les 57 valeurs des colonnes initiales.
Nous pouvons désormais appliquer l’ACP sur les données de spamDFA.select("features")
.
Par défaut, Spark applique une ACP centrée mais non normée :
from pyspark.ml.feature import PCA
# Construction et application d'une nouvelle instance d'estimateur PCA
pcaModel = PCA().setInputCol("features").setOutputCol("pcaFeatures") \
.setK(3).fit(spamDFA)
# Syntaxe équivalente :
#pcaModel = PCA(k=3, inputCol="features", outputCol="pcaFeatures").fit(spamDFA)
# Application du « transformateur » PCAModel résultant de l'estimation
resultat = pcaModel.transform(spamDFA).select("pcaFeatures")
# Les 3 plus grandes valeurs propres (exprimées en proportion de variance expliquée)
print(pcaModel.explainedVariance)
# Vecteurs propres correspondants
print(pcaModel.pc)
Afin d’obtenir une ACP centrée et normée, il est nécessaire de normaliser les variables (les transformer pour que chaque variable soit de moyenne nulle et écart-type égal à 1) avant application de l’ACP.
Pour cela, il est possible de faire appel à StandardScaler
:
from pyspark.ml.feature import StandardScaler
# Construction d'une nouvelle instance d'estimateur StandardScaler
scaler = StandardScaler().setInputCol("features").setOutputCol("scaledFeatures") \
.setWithStd(True).setWithMean(True)
# Syntaxe équivalente :
#scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", \
# withStd=True, withMean=True)
# Application de la nouvelle instance d'estimateur StandardScaler
scalerModel = scaler.fit(spamDFA)
La variable scalerModel
contient désormais le modèle de normalisation adapté au jeu de données spamDFA
. Il est possible d’inspecter les statistiques qui ont été estimées sur le DataFrame
:
# Examen des moyennes et des variances calculées dans le StandardScalerModel résultant
print(scalerModel.mean)
print(scalerModel.std)
Une fois le modèle obtenu, il peut être appliqué sur n’importe quel jeu de données afin de le normaliser. Dans notre cas, nous l’utilisons pour normaliser (centrage et réduction) les éléments du jeu de données spamDFA
:
# Application du transformer StandardScalerModel résultant
scaledSpamDF = scalerModel.transform(spamDFA).select("scaledFeatures", "label")
# scaledSpamDF a une seule colonne de Vectors, nommée "scaledFeatures"
scaledSpamDF.printSchema()
Question :
Appliquez l’ACP sur le DataFrame scaledSpamDF
et mettez les projections des données sur les trois premiers axes principaux dans un DataFrame resultatCR
. Examinez les trois plus grandes valeurs propres et comparez-les à celles obtenues précédemment. Que constatez-vous ?
Question :
Appliquez l’ACP non normée sur un échantillon (appelé spamEch3
) de 10% du DataFrame spamDF
obtenu par échantillonnage simple et mettez les projections des données sur les trois premiers axes principaux dans un DataFrame resultatEch3
. Examinez les trois plus grandes valeurs propres et comparez-les à celles obtenues dans l’ACP non normée de spamDF
. Que constatez-vous ?
ACP sur des données aléatoires¶
À titre d’exemple, nous pouvons également appliquer l’ACP sur des données tridimensionnelles obtenues par tirage aléatoire suivant une loi normale isotrope :
from pyspark.sql.functions import randn
from pyspark.ml.feature import PCA
# Création d'un Dataframe d'identifiants entre 0 et 4600
idsDF = spark.range(4601)
idsDF.show(5)
# Création d'un Dataframe avec 3 colonnes de valeurs aléatoires
# tirées d'une loi normale d'espérance nulle et écart-type 1
randnDF = idsDF.select("id", randn(seed=1).alias("normale0"), \
randn(seed=2).alias("normale1"), \
randn(seed=3).alias("normale2"))
# Vérification : moyennes ~ 0, écart-types ~ 1 pour colonnes normale...
randnDF.describe().show()
# Construction du VectorAssembler
colsEntRnd = ["normale"+str(i) for i in range(0, 3)]
assRnd = VectorAssembler().setInputCols(colsEntRnd).setOutputCol("features")
# Construction du Dataframe randnDF2 en appliquant le VectorAssembler
randnDF2 = assRnd.transform(randnDF)
randnDF2.printSchema()
# Application de l'ACP
pcaRnd = PCA().setInputCol("features").setOutputCol("pcaRndFeatures") \
.setK(3).fit(randnDF2)
resRnd = pcaRnd.transform(randnDF2).select("pcaRndFeatures")
pcaRnd.explainedVariance
Question :
Examinez les trois premières valeurs propres et expliquez le résultat obtenu.
Visualiser les résultats¶
Dans cette séance de travaux pratiques nous visualiserons les données à l’aide de la bibliothèque Matplotlib. D’autres options sont les bibliothèques Plotly et Seaborn.
Nous souhaitons désormais produire un nuage de points en deux dimensions représentant chaque courriel projeté sur les deux axes principaux de l’ACP.
Question
Réaliser une analyse en composantes principales centrée et réduite sur scaledSpamDF
, mais en ne conservant cette fois-ci que les deux premiers axes principaux. Mettre le résultat dans une variable resultatCR_2D
.
Afin de réaliser la visualisation avec Matplotlib, une solution simple est de passer par des fonctionnalités de la bibliothèque Python pandas
. Le DataFrame Spark contenant les données à afficher est d’abord converti à un dataframe pandas
sur le nœud driver, ce qui implique que toutes les données du DataFrame situées sur les nœuds de calcul sont ramenées sur le nœud driver, comme pour une opération .collect()
. Si les données sont très volumineuses, cette opération est très coûteuse et peut même provoquer un débordement mémoire sur le driver. Dans le cas de données volumineuses il faut envisager d’extraire un (bien plus petit) échantillon de données dans un nouveau DataFrame et de convertir ce dernier à un dataframe pandas
. Noter que l’utilisation de Apache Arrow permet d’optimiser l’échange entre DataFrames Spark (distribués) et dataframes pandas
(localisés sur le nœud driver), voir cette description.
from pyspark.ml.functions import vector_to_array
from pyspark.sql.functions import col
import pandas as pd
# Extraction de deux colonnes à partir de l'unique colonne de vecteurs 2D
# et création de dataframe pandas à partir du DataFrame Spark
donneesAafficher = resultatCR_2D.withColumn("x", vector_to_array("pcaCRFeatures")) \
.select([col("x")[i] for i in range(2)]).toPandas()
donneesAafficher.head(5)
# Visualisation de type scatter plot
import matplotlib.pyplot as plt
donneesAafficher.plot.scatter('x[0]', 'x[1]')
Question
Réaliser la même visualisation dans le cas des données aléatoires. Que constatez-vous ?
Question
Pour terminer, effectuez la visualisation sur les données Spambase Data Set mais en ajoutant une couleur différente en fonction de l’étiquette du point considéré.