Travaux pratiques - Echantillonnage. Analyse en composantes principales

(la version précédente de cette séance, utilisant l’API RDD)

Références externes utiles :

L’objectif de cette séance de TP est de présenter les fonctionnalités proposées par Spark pour l’échantillonnage des données, ainsi que l’utilisation de l’analyse en composantes principales (ACP) dans Spark.

Les exemples ci-dessous reprennent, en partie, ceux des documentations en ligne.

Préparation de la visualisation

La visualisation nécessite d’avoir au préalable importé la bibliothèque vegas-viz.

Avec la gestion dynamique des dépendances pour Jupyter/Toree, il suffit d’utiliser la commande magique %AddDeps :

// Vegas v0.3.11 pour Scala 2.11
// L'option --transitive permet de télécharger également les dépendances de Vegas

%AddDeps org.vegas-viz vegas_2.11 0.3.11 --transitive
%AddDeps org.vegas-viz vegas-spark_2.11 0.3.11 --transitive

Échantillonnage

Lecture des données

Nous travaillerons lors de cette séance sur des données Spambase Data Set issues de l’archive de l’UCI. Il s’agit d’une collection de courriels étiquetés comme spam (indésirable) ou normal. Vous trouverez sur le site de l’UCI des explications plus détaillées concernant ces données, regardez-les, notamment la signification des variables et les classes auxquelles appartiennent les données.

import sys.process._

// Créé un dossier data
"mkdir -p data" !

// Télécharge les données Spambase
"wget -nc https://archive.ics.uci.edu/ml/machine-learning-databases/spambase/spambase.data -P data" !

Dans le fichier spambase.data, chaque ligne contient un vecteur de 57 valeurs float et ensuite une étiquette de classe (0 ou 1). Nous allons maintenant importer ces données dans Spark.

Note

Si vous n’utilisez pas les carnets Jupyter, dans une nouvelle fenêtre terminal placez-vous dans le répertoire de votre TP et lancez spark-shell --driver-memory 1g. L’option --driver-memory 1g permet de spécifier à la machine virtuelle Java que l’on souhaite allouer 1 Go de mémoire (plutôt que 512 Mo) par défaut, ce qui est un peu juste pour ce TP.

Sur les machines de TP du Cnam, si la commande spark-shell n’est pas trouvée, entrez d’abord export PATH="$PATH:/opt/spark/bin" et ensuite seulement spark-shell --driver-memory 1g.

import org.apache.spark.sql.types._

// Fonction qui produit le schéma du Dataframe sans la dernière colonne
def genSpamFields(from: Int, to: Int) = for (i <- from until to)
                         yield StructField("val"+i.toString, DoubleType, true)

// Génération des StructField successifs correspondant aux 57 premières colonnes
val spamFields = genSpamFields(0, 57)

// Construction du schéma complet du Dataframe (incluant la dernière colonne)
val spamSchema = StructType(spamFields).add("label", DoubleType, true)

// Lecture des données et création du Dataframe
val spamDF = spark.read.format("csv").schema(spamSchema).load("data/spambase.data").cache()
spamDF.printSchema()    // vérification

Nous nous servirons du DataFrame spamDF dans la suite de cette séance.

Question :

Expliquez la construction de spamSchema.

Correction :

Un schéma StructType de DataFrame est une succession de StructField. Dans les données de spambase.data, les 57 premières colonnes sont des float et la 58ème est un int correspondant à l’étiquette de classe (0 signifie non spam, 1 signifie spam). La fonction genSpamFields() produit une séquence de StructField. Dans spamFields nous obtenons donc 57 StructField avec des noms "val...", de type DoubleType et dont des valeurs peuvent être absentes (signification de true). cette séquence est convertie en StructType et on lui ajoute à la fin le StructField correspondant à la dernière colonne, l’étiquette (le type DoubleType est conservé pour des considérations pratiques).

Échantillonnage simple

Nous allons d’abord générer deux DataFrame, spamEch1 et spamEch2, qui sont des échantillons du DataFrame spamDF (réduit aux 3 premières colonnes) avec deux valeurs très différentes pour le taux de sélection (paramètre fraction dans Dataset.sample()) :

// Combien de lignes de chaque classe contient spamDF (attention opération coûteuse)
spamDF.groupBy("label").count().show()
// Définition d'échantillons (échantillonnage simple)
// On ne garde que les colonnes "val0", "val1" et "label"
val spamEch1 = spamDF.select("val0", "val1", "label").sample(false, 0.5)
println(spamEch1.count())
val spamEch2 = spamDF.select("val0", "val1", "label").sample(false, 0.1)
println(spamEch2.count())
// Opérations possibles aussi mais coûteuses
spamEch1.groupBy("label").count().show()
spamEch2.groupBy("label").count().show()

La réduction du DataFrame à 3 colonnes (avec .select("val0", "val1", "label")) est simplement effectuée pour améliorer la lisibilité de certaines des opérations suivantes.

Question :

Que pouvez-vous constater en comparant le nombre de lignes de spamEch1 ou spamEch2 et celui de spamDF multiplié par la valeur de fraction correspondante ? Refaites l’échantillonnage (avec les mêmes valeurs de taux de sélection) et comparez. Expliquez.

Correction :

Il y a un écart important entre 4601 * 0.5 et le nombre de lignes de spamEch1, ainsi qu’entre 4601 * 0.1 et le nombre de lignes de spamEch2. De plus, si on refait l’échantillonnage on n’obtient pas nécessairement le même nombre de lignes. La valeur de fraction représente la probabilité de retenir une ligne du DataFrame de départ dans l’échantillon (même valeur valable pour toutes les lignes) et non un « taux de sélection » dans le sens nb_lignes_DataFrame_de_départ * taux_de_sélection = nb_lignes_DataFrame_échantillon.

Examinons maintenant des statistiques simples calculées sur les colonnes des 3 DataFrame :

val summary  = spamDF.select("val0", "val1", "label").describe()
val summary1 = spamEch1.describe()
val summary2 = spamEch2.describe()
summary.show()
summary1.show()
summary2.show()

Question :

Quel constat faites-vous en comparant les moyennes calculées pour une même variable (colonne) sur spamDF.select("val0", "val1", "label") et sur les deux échantillons ? Expliquez.

Correction :

Les estimations faites à partir d’un échantillon sont (naturellement) différentes des valeurs obtenues sur la totalité des données et l’erreur d’estimation augmente en général avec la réduction de la taille de l’échantillon.

Échantillonnage stratifié

L’échantillonnage stratifié permet de fixer le taux d’échantillonnage pour chaque strate. Pour un DataFrame, les strates sont définis par les valeurs uniques d’une des colonnes (une valeur définit un strate). La méthode .sampleBy() de DataFrameStatFunctions (voir la documentation API Spark en Scala) respecte de façon approximative les taux d’échantillonnage indiqués. Il est nécessaire d’indiquer la valeur d’initialisation (seed) du générateur aléatoire (ci-dessous 1L).

// Définition des fractions par strate
val fractions = Map(0.0 -> 0.5, 1.0 -> 0.5)    // Map[K, Double]

// Échantillonnage stratifié (approximatif)
val spamEchStratifie = spamDF.select("val0", "val1", "label").stat.sampleBy("label", fractions, 1L)
spamEchStratifie.groupBy("label").count().show()

Question :

Refaites l’échantillonnage avec une autre valeur de seed et comparez les résultats. Que constatez-vous ?

Correction :

Avec une autre valeur, les effectifs des strates dans l’échantillon sont assez sensiblement différents.

Dans l’état actuel de développement de l’API Dataframe, l’échantillonnage stratifié plus précis (respectant de plus près les taux de sélection indiqués dans le Map()) n’est pas directement accessible. Il est en revanche possible de passer pour cela par l’API RDD, dans laquelle on trouve une méthode sampleByKeyExact qui répond à cette attente (tout en étant plus coûteuse) :

import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.PairRDDFunctions
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
import org.apache.spark.ml.linalg.Vectors

// Echantillonnage stratifié exact avec .sampleByKeyExact() en passant par un RDD
val spamEchStratRDD = spamDF.select("val0", "val1", "label").rdd
           .map(l => (l.getAs[Double](2), Vectors.dense(l.getAs[Double](0),l.getAs[Double](1))))
           .sampleByKeyExact(false, fractions).map(l => Row(l._1, l._2))

// Définition du schéma pour le Dataframe
val schemaReduit = StructType(Seq(StructField("label", DoubleType, true),
                                         StructField("valeurs", VectorType, true)))

// Reconversion en Dataframe
val spamEchStratifieExact = spark.createDataFrame(spamEchStratRDD, schemaReduit)

// Examen de l'échantillon
spamEchStratifieExact.groupBy("label").count().show()

L’échantillon spamEchStratifieExact respecte mieux les taux de sélection demandés dans fractions mais est bien plus coûteux à obtenir que spamEchStratifie.

Analyse en composantes principales (ACP)

MLlib donne la possibilité de calculer directement les k premières composantes principales pour un ensemble d’observations. Les N observations sont représentées par une ou plusieurs colonnes d’un DataFrame dont le nombre de lignes est égal au nombre d’observations (N). Le DataFrame peut contenir une colonne de type Vector avec autant de composantes qu’il y a de variables soumises à l’ACP (nombre noté ici par n), ou alors plusieurs colonnes qu’il faut regrouper au préalable pour obtenir une colonne de type Vector.

Les k premières composantes principales sont les vecteurs propres d’une ACP centrée mais non normée (les variables sont transformées pour être de moyenne nulle mais la variance n’est pas modifiée pour devenir égale à 1) appliquée aux observations. Les valeurs propres sont retournées triées par ordre décroissant et exprimées en proportion de variance expliquée dans un DenseVector accessible par PCAModel.explainedVariance. Les composantes principales (vecteurs propres unitaires associés aux valeurs propres) sont retournées dans une matrice locale DenseMatrix de n lignes (le nombre de variables initiales) et k colonnes (le nombre de composantes principales demandées) accessible par PCAModel.pc. Les composantes principales sont triées en ordre décroissant des valeurs propres correspondantes.

L’ACP est réalisée en définissant une instance de PCA et en appelant la méthode .fit() pour cette instance. Le résultat est un PCAModel. La projection des données sur les k premières composantes principales est obtenue en appelant la méthode .transform() pour ce PCAModel.

Dans la suite, nous appliquerons l’ACP sur les données de Spambase Data Set, ainsi que sur un échantillon de ces données et sur des données aléatoires générées suivant une loi normale multidimensionnelle isotrope (c’est à dire de de même variance dans toutes les directions). Nous visualiserons les résultats.

Réaliser l’ACP

ACP sur le Spambase Data Set

La première opération consiste à regrouper les colonnes à analyser du DataFrame spamDF dans une nouvelle colonne de type Vector en utilisant le VectorAssembler :

import org.apache.spark.ml.feature.VectorAssembler

// Construction du VectorAssembler
val colsEntree = {for (i <- 0 until 57) yield "val"+i.toString}.toArray
val assembleur = new VectorAssembler().setInputCols(colsEntree).setOutputCol("features")

// Construction du Dataframe spamDFA en appliquant le VectorAssembler
val spamDFA = assembleur.transform(spamDF)
spamDFA.printSchema()
spamDFA.select("features").show(3)

La colonne features contient désormais un vecteur (type Vector) qui rassemble les 57 valeurs des colonnes initiales. Nous pouvons désormais appliquer l’ACP sur les données de spamDFA.select("features"). Par défaut, Spark applique une ACP centrée mais non normée :

import org.apache.spark.ml.feature.PCA

// Construction et application d'une nouvelle instance d'estimateur PCA
val pca = new PCA().setInputCol("features").setOutputCol("pcaFeatures")
                          .setK(3).fit(spamDFA)

// Application du « transformateur » PCAModel résultant de l'estimation
val resultat = pca.transform(spamDFA).select("pcaFeatures")

// Les 3 plus grandes valeurs propres (exprimées en proportion de variance expliquée)
println(pca.explainedVariance)

// Vecteurs propres correspondants
println(pca.pc)

Afin d’obtenir une ACP centrée et normée, il est nécessaire de normaliser les variables (les transformer pour que chaque variable soit de moyenne nulle et écart-type égal à 1) avant application de l’ACP. Pour cela, il est possible de faire appel à StandardScaler :

import org.apache.spark.ml.feature.StandardScaler

// Construction d'une nouvelle instance d'estimateur StandardScaler
val scaler = new StandardScaler().setInputCol("features").setOutputCol("scaledFeatures")
                                        .setWithStd(true).setWithMean(true)

// Application de la nouvelle instance d'estimateur StandardScaler
val scalerModel = scaler.fit(spamDFA)

La variable scalerModel contient désormais le modèle de normalisation adapté au jeu de données spamDFA. Il est possible d’inspecter les statistiques qui ont été estimées sur le DataFrame :

// Examen des moyennes et des variances calculées dans le StandardScalerModel résultant
println(scalerModel.mean)
println(scalerModel.std)

Une fois le modèle obtenu, il peut être appliqué sur n’importe quel jeu de données afin de le normaliser. Dans notre cas, nous l’utilisons pour normaliser (centrage et réduction) les éléments du jeu de données spamDFA :

// Application du transformer StandardScalerModel résultant
val scaledSpamDF = scalerModel.transform(spamDFA).select("scaledFeatures", "label")

// scaledSpamDF a une seule colonne de Vectors, nommée "scaledFeatures"
scaledSpamDF.printSchema

Question :

Appliquez l’ACP sur le DataFrame scaledSpamDF et mettez les projections des données sur les trois premiers axes principaux dans un DataFrame resultatCR. Examinez les trois plus grandes valeurs propres et comparez-les à celles obtenues précédemment. Que constatez-vous ?

Correction

// Construction et application d'une nouvelle instance d'estimateur PCA
val pcaCR = new PCA().setInputCol("scaledFeatures").setOutputCol("pcaCRFeatures")
                            .setK(3).fit(scaledSpamDF)

// Application du « transformateur » PCAModel résultant de l'estimation
val resultatCR = pcaCR.transform(scaledSpamDF).select("pcaCRFeatures")

// Les 3 plus grandes valeurs propres (exprimées en proportion de variance expliquée)
pcaCR.explainedVariance

Les écarts entre les 3 plus grandes valeurs propres sont plus faibles sur les données centrées et réduites que sur les données seulement centrées. Cela montre que les variables initiales avaient des variances très différentes (après centrage).

Question :

Appliquez l’ACP non normée sur un échantillon (appelé spamEch3) de 10% du DataFrame spamDF obtenu par échantillonnage simple et mettez les projections des données sur les trois premiers axes principaux dans un DataFrame resultatEch3. Examinez les trois plus grandes valeurs propres et comparez-les à celles obtenues dans l’ACP non normée de spamDF. Que constatez-vous ?

Correction

val spamEch3 = spamDFA.sample(false, 0.1)
val pcaEch3 = new PCA().setInputCol("features").setOutputCol("pcaFeatures")
                              .setK(3).fit(spamEch3)
val resultatEch3 = pcaEch3.transform(spamEch3).select("pcaFeatures")
pcaEch3.explainedVariance

Les valeurs propres obtenues sur un échantillon d’environ 10% sont naturellement différentes de celles obtenues sur la totalité des données, les écarts (relatifs) étant toutefois plus faible pour les valeurs propres les plus grandes que pour les suivantes.

ACP sur des données aléatoires

À titre d’exemple, nous pouvons également appliquer l’ACP sur des données tridimensionnelles obtenues par tirage aléatoire suivant une loi normale isotrope :

import org.apache.spark.sql.functions.randn
import org.apache.spark.ml.feature.PCA

// Création d'un Dataframe d'identifiants entre 0 et 4600
val idsDF = spark.range(4601)

// Création d'un dataframe avec 3 colonnes de valeurs aléatoires
// tirées d'une loi normale d'espérance nulle et écart-type 1
val randnDF = idsDF.select($"id", randn(seed=1).alias("normale0"),
                                         randn(seed=2).alias("normale1"),
                                         randn(seed=3).alias("normale2"))
// Vérification
randnDF.describe().show()

// Construction du VectorAssembler
val colsEntRnd = {for (i <- 0 until 3) yield "normale"+i.toString}.toArray
val assRnd = new VectorAssembler().setInputCols(colsEntRnd).setOutputCol("features")

// Construction du Dataframe randnDF2 en appliquant le VectorAssembler
val randnDF2 = assRnd.transform(randnDF)
randnDF2.printSchema()

// Application de l'ACP
val pcaRnd = new PCA().setInputCol("features").setOutputCol("pcaRndFeatures")
                             .setK(3).fit(randnDF2)
val resRnd = pcaRnd.transform(randnDF2).select("pcaRndFeatures")
pcaRnd.explainedVariance

Question :

Examinez les trois premières valeurs propres et expliquez le résultat obtenu.

Correction :

Les valeurs propres obtenues sur les données aléatoires suivant une loi normale isotrope sont (pratiquement) égales, le nuage de points est presque sphérique ; les vecteurs propres ne sont pas utiles dans ce cas car une faible perturbation dans l’échantillon aléatoire suivant la loi normale isotrope aura comme conséquence un changement potentiellement fort dans les vecteurs propres associés aux trois plus grandes valeurs propres.

Approfondissement : visualiser les résultats avec Vegas

Dans cette séance de travaux pratiques nous visualiserons les données à l’aide de la bibliothèque Vegas pour Scala.

Nous souhaitons désormais produire un nuage de points en deux dimensions représentant chaque courriel projeté sur les deux axes principaux de l’ACP.

Question

Réaliser une analyse en composantes centrée réduite sur scaledSpamDF mais en ne conservant cette fois-ci que les deux premiers axes principaux. Mettre le résultat dans une variable resultatCR_2D.

Correction

Il s’agit simplement d’appliquer sur les données centrées-réduites une ACP avec \(k = 2\).

val pcaCR = new PCA().setInputCol("scaledFeatures").setOutputCol("pcaCRFeatures")
                     .setK(2).fit(scaledSpamDF)

// Application du « transformateur » PCAModel résultant de l'estimation
val resultatCR_2D = pcaCR.transform(scaledSpamDF).select("pcaCRFeatures")

Afin de réaliser la visualisation avec Vegas, il est nécessaire de transformer les vecteurs features de l’ACP en deux colonnes séparées contenant les coordonnées en x et y.

import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.functions.udf
// Extrait le premier scalaire d'un vecteur et le place dans une colonne
val first = udf((v: Vector) => v.toArray(0))
// Extrait le deuxième scalaire d'un vecteur et le place dans une colonne
val second = udf((v: Vector) => v.toArray(1))

val coordinates = resultatCR_2D.withColumn("x", first($"pcaCRFeatures")).withColumn("y", second($"pcaCRFeatures"))
coordinates.printSchema()

Le DataFrame résultant contient maintenant deux nouvelles colonnes x et y qui contiennent les coordonnées de chaque élément du DataFrame initial spamDF projeté selon les deux axes principaux après ACP.

Nous pouvons maintenant effectuer la visualisation avec Vegas.

Pour ce TP nous supposons que vous travaillez avec Jupyter. Si ce n’est pas le cas, vous devrez adapter la variable render pour utiliser le moteur de rendu adéquat.

import vegas._
implicit val render = vegas.render.ShowHTML(kernel.display.content("text/html", _))

import vegas.data.External._
import vegas.sparkExt._
Vegas("PCA 2D").withDataFrame(coordinates).mark(Point).encodeX("x", Quant).encodeY("y", Quant).show

L’utilisation de Vegas se fait de la façon suivante : withDataFrame permet de spécifier que l’on veut visualiser le contenu d’un DataFrame spécifique, mark indique que nos observations seront identifiées par des points et .encodeX/encodeY précisent quelles colonnes correspondent aux coordonnées des points. Les paramètres Quant signifie que les colonnes x et y représentent des variables quantitatives. Les variables nominatives sont spécifiées par Nom.

Question

Réaliser la même visualisation dans le cas des données aléatoires. Que constatez-vous ?

Correction :

val output = resRnd.withColumn("x", first($"pcaRndFeatures")).withColumn("y", second($"pcaRndFeatures"))
Vegas("PCA 2D").withDataFrame(output).mark(Point).encodeX("x", Quant).encodeY("y", Quant).show

Dans le cas des données aléatoires, l’analyse en composantes principales ne permet pas de déterminer de direction principale car la distribution des données est isotrope. Par conséquent, la visualisation obtenue est simplement une boule bidimensionnelle.

Question

Pour terminer, effectuez cette visualisation mais en ajoutant une couleur différente en fonction de l’étiquette du point considéré.

Correction

Pour visualiser les étiquettes, il faut conserver la colonne label après l’ACP :

val resultatCR_2D = pcaCR.transform(scaledSpamDF).select("pcaCRFeatures", "label")

On peut ensuite ajouter l’option .encodeColor("label", Nom) à la visualisation précédente.

val output = resultatCR_2D.withColumn("x", first($"pcaCRFeatures")).withColumn("y", second($"pcaCRFeatures"))
Vegas("PCA 2D").withDataFrame(output).mark(Point).encodeX("x", Quant).encodeY("y", Quant).encodeColor("label", Nom).show