Travaux pratiques - Echantillonnage. Analyse en composantes principales

(la version précédente de cette séance, utilisant l’API RDD)

Références externes utiles :

Les exemples ci-dessous reprennent, en partie, ceux des documentations en ligne.

Echantillonnage

Nous travaillerons lors de cette séance sur des données Spambase Data Set issues de l’archive de l’UCI. Vous trouverez sur le site de l’UCI des explications plus détaillées concernant ces données, regardez-les, notamment la signification des variables et les classes auxquelles appartiennent les données. Pour charger ces données, ouvrez une fenêtre terminal et entrez les commandes suivantes (recopier sur une même ligne les deux parties du lien HTTP) :

$ cd
$ mkdir -p tpacp/data
$ cd tpacp/data
$ wget https://archive.ics.uci.edu/ml/
         machine-learning-databases/spambase/spambase.data --no-check-certificate

Dans le fichier spambase.data, chaque ligne contient un vecteur de 57 valeurs float et ensuite une étiquette de classe (0 ou 1). Pour lire ce fichier et créer la structure de données qui sera utilisée dans la suite, lancez spark-shell dans une nouvelle fenêtre terminal. Entrez

$ cd tpacp
$ spark-shell --driver-memory 1g

Si la commande spark-shell n’est pas trouvée, entrez d’abord export PATH="$PATH:/opt/spark/bin" et ensuite seulement spark-shell --driver-memory 1g. Le paramètre --driver-memory permet de demander l’utilisation de plus de mémoire pour la machine virtuelle java (par défaut seulement 512 KOctets).

Ensuite, dans l’interpréteur de commandes Spark :

scala> import org.apache.spark.sql.types._

// Fonction qui produit le schéma du Dataframe sans la dernière colonne
scala> def genSpamFields(from: Int, to: Int) = for (i <- from until to)
                         yield StructField("val"+i.toString, DoubleType, true)

// Génération des StructField successifs correspondant aux 57 premières colonnes
scala> val spamFields = genSpamFields(0, 57)

// Construction du schéma complet du Dataframe (incluant la dernière colonne)
scala> val spamSchema = StructType(spamFields).add("label", DoubleType, true)

// Lecture des données et création du Dataframe
scala> val spamDF = spark.read.format("csv").schema(spamSchema).load("data/spambase.data").cache()
scala> spamDF.printSchema()    // vérification

Nous nous servirons du DataFrame spamDF dans la suite de cette séance.

Question :

Expliquez la construction de spamSchema.

Correction :

Un schéma StructType de DataFrame est une succession de StructField. Dans les données de spambase.data, les 57 premières colonnes sont des float et la 58ème est un int correspondant à l’étiquette de classe (0 signifie non spam, 1 signifie spam). La fonction genSpamFields() produit une séquence de StructField. Dans spamFields nous obtenons donc 57 StructField avec des noms "val...", de type DoubleType et dont des valeurs peuvent être absentes (signification de true). cette séquence est convertie en StructType et on lui ajoute à la fin le StructField correspondant à la dernière colonne, l’étiquette (le type DoubleType est conservé pour des considérations pratiques).

Echantillonnage simple

Nous obtiendrons d’abord deux DataFrame, spamEch1 et spamEch2, qui sont des échantillons du DataFrame spamDF (réduit aux 3 premières colonnes) avec deux valeurs très différentes pour le taux de sélection (paramètre fraction dans Dataset.sample()) :

// Combien de lignes de chaque classe contient spamDF (attention opération coûteuse)
scala> spamDF.groupBy("label").count().show()
+-----+-----+
|label|count|
+-----+-----+
|  0.0| 2788|
|  1.0| 1813|
+-----+-----+

// Définition d'échantillons (échantillonnage simple)
scala> val spamEch1 = spamDF.select("val0", "val1", "label").sample(false, 0.5)
scala> spamEch1.count()
scala> val spamEch2 = spamDF.select("val0", "val1", "label").sample(false, 0.1)
scala> spamEch2.count()

// Opérations possibles aussi mais coûteuses
scala> spamEch1.groupBy("label").count().show()
scala> spamEch2.groupBy("label").count().show()

La réduction du DataFrame à 3 colonnes (avec .select("val0", "val1", "label")) est simplement effectuée pour améliorer la lisibilité de certaines des opérations suivantes.

Question :

Que pouvez-vous constater en comparant le nombre de lignes de spamEch1 ou spamEch2 et celui de spamDF multiplié par la valeur de fraction correspondante ? Refaites l’échantillonnage (avec les mêmes valeurs de taux de sélection) et comparez. Expliquez.

Correction :

Il y a un écart important entre 4601 * 0.5 et le nombre de lignes de spamEch1, ainsi qu’entre 4601 * 0.1 et le nombre de lignes de spamEch2. De plus, si on refait l’échantillonnage on n’obtient pas nécessairement le même nombre de lignes. La valeur de fraction représente la probabilité de retenir une ligne du DataFrame de départ dans l’échantillon (même valeur valable pour toutes les lignes) et non un “taux de sélection” dans le sens nb_lignes_DataFrame_de_départ * taux_de_sélection = nb_lignes_DataFrame_échantillon.

Examinons maintenant des statistiques simples calculées sur les colonnes des 3 DataFrame :

scala> val summary  = spamDF.select("val0", "val1", "label").describe()
scala> val summary1 = spamEch1.describe()
scala> val summary2 = spamEch2.describe()
scala> summary.show()
scala> summary1.show()
scala> summary2.show()

Question :

Quel constat faites-vous en comparant les moyennes calculées pour une même variable (colonne) sur spamDF.select("val0", "val1", "label") et sur les deux échantillons ? Expliquez.

Correction :

Les estimations faites à partir d’un échantillon sont (naturellement) différentes des valeurs obtenues sur la totalité des données et l’erreur d’estimation augmente en général avec la réduction de la taille de l’échantillon.

Echantillonnage stratifié

L’échantillonnage stratifié permet de fixer le taux d’échantillonnage pour chaque strate. Pour un DataFrame, les strates sont définis par les valeurs d’une des colonnes (une valeur définit un strate). La méthode .sampleBy() de DataFrameStatFunctions (voir la documentation API Spark en Scala) respecte de façon approximative les taux d’échantillonnage indiqués. Il est nécessaire d’indiquer la valeur de seed (d’initialisation) pour le générateur aléatoire (ci-dessous 1L).

// Définition des fractions par strate
scala> val fractions = Map(0.0 -> 0.5, 1.0 -> 0.5)    // Map[K, Double]

// Echantillonnage stratifié (approximatif)
scala> val spamEchStratifie = spamDF.select("val0", "val1", "label").stat.sampleBy("label", fractions, 1L)
scala> spamEchStratifie.groupBy("label").count().show()

Question :

Refaites l’échantillonnage avec une autre valeur de seed et comparez les résultats. Que constatez-vous ?

Correction :

Avec une autre valeur, les effectifs des strates dans l’échantillon sont assez sensiblement différents.

Dans l’état actuel de développement de l’API Dataframe, l’échantillonnage stratifié plus précis (respectant de plus près les taux de sélection indiqués dans le Map()) n’est pas directement accessible. Il est en revanche possible de passer pour cela par l’API RDD, dans laquelle on trouve une méthode sampleByKeyExact qui répond à cette attente (tout en étant plus coûteuse) :

scala> import org.apache.spark.sql.Row
scala> import org.apache.spark.rdd.RDD
scala> import org.apache.spark.rdd.PairRDDFunctions
scala> import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
scala> import org.apache.spark.ml.linalg.Vectors

// Echantillonnage stratifié exact avec .sampleByKeyExact() en passant par un RDD
scala> val spamEchStratRDD = spamDF.select("val0", "val1", "label").rdd
           .map(l => (l.getAs[Double](2), Vectors.dense(l.getAs[Double](0),l.getAs[Double](1))))
           .sampleByKeyExact(false, fractions).map(l => Row(l._1, l._2))

// Définition du schéma pour le Dataframe
scala> val schemaReduit = StructType(Seq(StructField("label", DoubleType, true),
                                         StructField("valeurs", VectorType, true)))

// Reconversion en Dataframe
scala> val spamEchStratifieExact = spark.createDataFrame(spamEchStratRDD, schemaReduit)

// Examen de l'échantillon
scala> spamEchStratifieExact.groupBy("label").count().show()
+-----+-----+
|label|count|
+-----+-----+
|  0.0| 1394|
|  1.0|  907|
+-----+-----+

L’échantillon spamEchStratifieExact respecte mieux les taux de sélection demandés dans fractions mais est bien plus coûteux à obtenir que spamEchStratifie.

Analyse en composantes principales (ACP)

MLlib donne la possibilité de calculer directement les k premières composantes principales pour un ensemble d’observations. Les N observations sont représentées par une ou plusieurs colonnes d’un DataFrame dont le nombre de lignes est égal au nombre d’observations (N). La DataFrame peut contenir une colonne de type Vector avec autant de composantes qu’il y a de variables soumises à l’ACP (nombre noté ici par n), ou alors plusieurs colonnes qu’il faut regrouper au préalable pour obtenir une colonne de type Vector.

Les k premières composantes principales sont les vecteurs propres d’une ACP centrée mais non normée (les variables sont transformées pour être de moyenne nulle mais la variance n’est pas modifiée pour devenir égale à 1) appliquée aux observations. Les valeurs propres sont retournées triées par ordre décroissant et exprimées en proportion de variance expliquée dans un DenseVector accessible par PCAModel.explainedVariance. Les composantes principales (vecteurs propres unitaires associés aux valeurs propres) sont retournées dans une matrice locale DenseMatrix de n lignes (le nombre de variables initiales) et k colonnes (le nombre de composantes principales demandées) accessible par PCAModel.pc. Les composantes principales sont triées en ordre décroissant des valeurs propres correspondantes.

L’ACP est réalisée en définissant une instance de PCA et en appelant la méthode .fit() pour cette instance. Le résultat est un PCAModel. La projection des données sur les k premières composantes principales est obtenue en appelant la méthode .transform() pour ce PCAModel.

Dans la suite, nous appliquerons l’ACP sur les données de Spambase Data Set, ainsi que sur un échantillon de ces données et sur des données aléatoires générées suivant une loi normale multidimensionnelle isotrope (c’est à dire de de même variance dans toutes les directions). Nous visualiserons les résultats.

Réaliser l’ACP

La première opération consiste à regrouper les colonnes à analyser du DataFrame spamDF dans une nouvelle colonne de type Vector en utilisant le VectorAssembler :

scala> import org.apache.spark.ml.feature.VectorAssembler

// Construction du VectorAssembler
scala> val colsEntree = {for (i <- 0 until 57) yield "val"+i.toString}.toArray
scala> val assembleur = new VectorAssembler().setInputCols(colsEntree).setOutputCol("features")

// Construction du Dataframe spamDFA en appliquant le VectorAssembler
scala> val spamDFA = assembleur.transform(spamDF)
scala> spamDFA.printSchema()
scala> spamDFA.select("features").show(3)

Application de l’ACP centrée (mais non normée) sur les données de spamDFA.select("features") :

scala> import org.apache.spark.ml.feature.PCA

// Construction et application d'une nouvelle instance d'estimateur PCA
scala> val pca = new PCA().setInputCol("features").setOutputCol("pcaFeatures")
                          .setK(3).fit(spamDFA)

// Application du « transformateur » PCAModel résultant de l'estimation
scala> val resultat = pca.transform(spamDFA).select("pcaFeatures")

// Les 3 plus grandes valeurs propres (exprimées en proportion de variance expliquée)
scala> pca.explainedVariance

// Vecteurs propres correspondants
scala> pca.pc

Afin d’obtenir une ACP normée il est nécessaire de normaliser les variables avant application de l’ACP. Pour cela il est nécessaire de faire appel à StandardScaler :

scala> import org.apache.spark.ml.feature.StandardScaler

// Construction d'une nouvelle instance d'estimateur StandardScaler
scala> val scaler = new StandardScaler().setInputCol("features").setOutputCol("scaledFeatures")
                                        .setWithStd(true).setWithMean(true)

// Application de la nouvelle instance d'estimateur StandardScaler
scala> val scalerModel = scaler.fit(spamDFA)

// Examen des moyennes et des variances calculées dans le StandardScalerModel résultant
scala> scalerModel.mean
scala> scalerModel.std

// Application du transformer StandardScalerModel résultant
scala> val scaledSpamDF = scalerModel.transform(spamDFA).select("scaledFeatures")

// scaledSpamDF a une seule colonne de Vectors, nommée "scaledFeatures"
scala> scaledSpamDF.printSchema

Question :

Appliquez l’ACP sur le DataFrame scaledSpamDF et mettez les projections des données sur les trois premiers axes principaux dans un DataFrame resultatCR. Examinez les trois plus grandes valeurs propres et comparez-les à celles obtenues précédemment. Que constatez-vous ?

Correction

// Construction et application d'une nouvelle instance d'estimateur PCA
scala> val pcaCR = new PCA().setInputCol("scaledFeatures").setOutputCol("pcaCRFeatures")
                            .setK(3).fit(scaledSpamDF)

// Application du « transformateur » PCAModel résultant de l'estimation
scala> val resultatCR = pcaCR.transform(scaledSpamDF).select("pcaCRFeatures")

// Les 3 plus grandes valeurs propres (exprimées en proportion de variance expliquée)
scala> pcaCR.explainedVariance

Les écarts entre les 3 plus grandes valeurs propres sont plus faibles sur les données centrées et réduites que sur les données centrées. Cela montre que les variables initiales avaient des variances très différentes (après centrage).

Question :

Appliquez l’ACP non normée sur un échantillon (appelé spamEch3) de 10% du DataFrame spamDF obtenu par échantillonnage simple et mettez les projections des données sur les trois premiers axes principaux dans un DataFrame resultatEch3. Examinez les trois plus grandes valeurs propres et comparez-les à celles obtenues dans l’ACP non normée de spamDF. Que constatez-vous ?

Correction

scala> val spamEch3 = spamDFA.sample(false, 0.1)
scala> val pcaEch3 = new PCA().setInputCol("features").setOutputCol("pcaFeatures")
                              .setK(3).fit(spamEch3)
scala> val resultatEch3 = pcaEch3.transform(spamEch3).select("pcaFeatures")
scala> pcaEch3.explainedVariance

Les valeurs propres obtenues sur un échantillon d’env. 10% sont naturellement différentes de celles obtenues sur la totalité des données, les écarts (relatifs) étant toutefois plus faible pour les valeurs propres les plus grandes que pour les suivantes.

Nous pouvons maintenant appliquer l’ACP sur des données tridimensionnelles obtenues par tirage aléatoire suivant une loi normale isotrope :

// Création d'un Dataframe d'identifiants entre 0 et 4600
scala> val idsDF = spark.range(4601)

// Création d'un dataframe avec 3 colonnes de valeurs alatoires
//   tirées d'une loi normale d'espérance nulle et écart-type 1
scala> val randnDF = idsDF.select($"id", randn(seed=1).alias("normale0"),
                                         randn(seed=2).alias("normale1"),
                                         randn(seed=3).alias("normale2"))
// Vérification
scala> randnDF.describe().show()

// Construction du VectorAssembler
scala> val colsEntRnd = {for (i <- 0 until 3) yield "normale"+i.toString}.toArray
scala> val assRnd = new VectorAssembler().setInputCols(colsEntRnd).setOutputCol("features")

// Construction du Dataframe randnDF2 en appliquant le VectorAssembler
scala> val randnDF2 = assRnd.transform(randnDF)
scala> randnDF2.printSchema()

// Application de l'ACP
scala> val pcaRnd = new PCA().setInputCol("features").setOutputCol("pcaRndFeatures")
                             .setK(3).fit(randnDF2)
scala> val resRnd = pcaRnd.transform(randnDF2).select("pcaRndFeatures")
scala> pcaRnd.explainedVariance

Question :

Examinez les trois premières valeurs propres et expliquez le résultat obtenu.

Correction :

Les valeurs propres obtenues sur les données aléatoires suivant une loi normale isotrope sont (pratiquement) égales, le nuage de points est presque sphérique ; les vecteurs propres ne sont pas utiles dans ce cas car une faible perturbation dans l’échantillon aléatoire suivant la loi normale isotrope aura comme conséquence un changement potentiellement fort dans les vecteurs propres associés aux trois plus grandes valeurs propres.

Visualiser les résultats

Dans cette séance de travaux pratiques nous visualiserons les données à l’aide d’un outil simple qu’est gnuplot. Vous pouvez vous servir d’autres outils (comme matplotlib ou ggplot2) si vous les maîtrisez déjà ; il vous faudra toutefois les installer.

Vérifiez si gnuplot est présent : entrez gnuplot dans une fenêtre terminal, si le programme est installé alors vous aurez un prompt gnuplot>, quittez en entrant quit. Dans la sale de travaux pratiques gnuplot est bien présent.

Si vous travaillez sur un autre ordinateur et gnuplot n’est pas présent sur votre système il est nécessaire de l’installer. Pour cela, entrez la commande suivante dans une fenêtre terminal (ou une commande adaptée à votre distribution linux) :

$ sudo apt-get install gnuplot

Gnuplot peut être utilisé avec des commandes en ligne ou avec des scripts. Pour écrire le script de base qui permettra la visualisation, ouvrez un éditeur (par ex. KWrite) et copiez dans la fenêtre de l’éditeur le texte suivant :

set datafile separator ','
set term x11 0
set title "Projections donnees spambase"
set xlabel "CP 1"
set ylabel "CP 2"
set zlabel "CP 3"
splot "data/acpspam/projSpam" using 1:2:3
pause -1
#
set term x11 1
set title "Projections donnees echantillon spambase"
splot "data/acpech2/projEch3" using 1:2:3
pause -1
#
set term x11 2
set title "Projections donnees aleatoires"
splot "data/acprnd/projRnd" using 1:2:3
pause -1

Enregistrez-le dans un fichier visualisation.gp dans le répertoire ~/tpacp (ici ~/ indique votre répertoire $HOME).

Il est maintenant nécessaire d’enregistrer sur disque les données projetées, dans un format adapté à la lecture par gnuplot : un vecteur par lignes, composantes séparées par un espace (ou une tabulation). Pour cela, entrez les commandes suivantes :

// Enregistrement des projections des données spambase sur les 3 premières CP
scala> resultatCR.repartition(1).map(v => v.toString.filter(c => c != '[' & c != ']'))
                 .write.text("data/acpspam")

// Enregistrement des projections des données de l'échantillon
//   Ici resultatEch3 est le nom du Dataframe qui garde les résultats pour l'échantillon
scala> resultatEch3.repartition(1).map(v => v.toString.filter(c => c != '[' & c != ']'))
                   .write.text("data/acpech3")

// Enregistrement des projections des données aléatoires
scala> resRnd.repartition(1).map(v => v.toString.filter(c => c != '[' & c != ']'))
             .write.text("data/acprnd")

Ces commandes ont créé des sous-répertoires acpspam, acpech2 et acprnd dans le répertoire /home/cloudera/tpacp/data. Chacun de ces sous-répertoires contient un fichier avec les données correspondantes, portant le nom part-00000.... Renommez ces fichiers respectivement projSpam, projEch3 et projRnd (avec les commandes linux mv ou avec le gestionnaire de fichiers).

Attention, l’opération .repartition() devrait être autant que possible évitée sur un DataFrame très volumineux, distribué sur un cluster, car très coûteuse. Aussi, .repartition(1) appliqué à un DataFrame volumineux, distribué, peut avoir pour conséquence un débordement mémoire. Enfin, enregistrer le contenu d’un DataFrame volumineux sous forme texte est une solution très inefficace, un format mieux adapté (par ex. parquet) devrait être préféré.

Pour visualiser les résultats, entrez les commandes suivantes dans une fenêtre terminal dans le répertoire ~/tpacp :

$ gnuplot
gnuplot> load "visualisation.gp"

En appuyant sur Entrée, vous ouvrez successivement chacun des trois fichiers, avec la possibilité de changer de point de vue avec la souris (clic gauche).