.. _chap-tpComposantesPrincipales: ####################################################################### Travaux pratiques - Echantillonnage. Analyse en composantes principales ####################################################################### .. only:: html .. container:: notebook .. image:: _static/zeppelin_classic_logo.png :class: svg-inline `Cahier Zeppelin `_ (`la version précédente de cette séance, utilisant l'API RDD `_) Références externes utiles : * `Documentation Spark `_ * `Documentation API Spark en Scala `_ * `Documentation Scala `_ **L'objectif** de cette séance de TP est de présenter les fonctionnalités proposées par Spark pour l'échantillonnage des données, ainsi que l'utilisation de l'analyse en composantes principales (ACP) dans Spark. Les exemples ci-dessous reprennent, en partie, ceux des documentations en ligne. **************** Échantillonnage **************** Lecture des données =================== Nous travaillerons lors de cette séance sur des données `Spambase Data Set issues de l'archive de l'UCI `_. Il s'agit d'une collection de courriels étiquetés comme *spam* (indésirable) ou normal. Vous trouverez sur le site de l'UCI des explications plus détaillées concernant ces données, regardez-les, notamment la signification des variables et les classes auxquelles appartiennent les données. Pour charger ces données, sous Linux et MacOS, ouvrez une fenêtre terminal et entrez les commandes suivantes (ou exécutez directement la cellule dans Zeppelin) : .. code-block:: bash %%bash mkdir -p tpacp/data cd tpacp/data wget -nc https://archive.ics.uci.edu/ml/machine-learning-databases/spambase/spambase.data Si vous êtes sous Windows, copiez et collez l'URL dans un navigateur et enregistrez le fichier dans un répertoire ``data``. Dans le fichier ``spambase.data``, chaque ligne contient un vecteur de 57 valeurs ``float`` et ensuite une étiquette de classe (0 ou 1). Nous allons maintenant importer ces données dans Spark. Si vous n'utilisez pas le cahier Zeppelin, placez vous dans le répertoire de votre TP (``cd tpacp``) et lancez ``spark-shell --driver-memory 1g`` dans une nouvelle fenêtre terminal. L'option ``--driver-memory 1g`` permet de spécifier à la machine virtuelle Java que l'on souhaite allouer 1 Go de mémoire (plutôt que 512 Mo) par défaut, ce qui est un peu juste pour ce TP. Si la commande ``spark-shell`` n'est pas trouvée, entrez d'abord ``export PATH="$PATH:/opt/spark/bin"`` et ensuite seulement ``spark-shell --driver-memory 1g``. Ensuite, dans l'interpréteur de commandes Spark : .. only:: jupyter .. code-block:: scala %spark.dep z.load("org.vegas-viz:vegas_2.11:0.3.11") z.load("org.vegas-viz:vegas-spark_2.11:0.3.11") .. code-block:: scala import org.apache.spark.sql.types._ // Fonction qui produit le schéma du Dataframe sans la dernière colonne def genSpamFields(from: Int, to: Int) = for (i <- from until to) yield StructField("val"+i.toString, DoubleType, true) // Génération des StructField successifs correspondant aux 57 premières colonnes val spamFields = genSpamFields(0, 57) // Construction du schéma complet du Dataframe (incluant la dernière colonne) val spamSchema = StructType(spamFields).add("label", DoubleType, true) // Lecture des données et création du Dataframe val spamDF = spark.read.format("csv").schema(spamSchema).load("tpacp/data/spambase.data").cache() spamDF.printSchema() // vérification Nous nous servirons du *DataFrame* ``spamDF`` dans la suite de cette séance. .. admonition:: Question : Expliquez la construction de ``spamSchema``. .. ifconfig:: tpscala in ('public') .. admonition:: Correction : Un schéma ``StructType`` de *DataFrame* est une succession de ``StructField``. Dans les données de ``spambase.data``, les 57 premières colonnes sont des ``float`` et la 58ème est un ``int`` correspondant à l'étiquette de classe (0 signifie non spam, 1 signifie spam). La fonction ``genSpamFields()`` produit une séquence de ``StructField``. Dans ``spamFields`` nous obtenons donc 57 StructField avec des noms ``"val..."``, de type ``DoubleType`` et dont des valeurs peuvent être absentes (signification de ``true``). cette séquence est convertie en ``StructType`` et on lui ajoute à la fin le ``StructField`` correspondant à la dernière colonne, l'étiquette (le type ``DoubleType`` est conservé pour des considérations pratiques). Échantillonnage simple ====================== Nous allons d'abord générer deux *DataFrame*, ``spamEch1`` et ``spamEch2``, qui sont des échantillons du *DataFrame* ``spamDF`` (réduit aux 3 premières colonnes) avec deux valeurs très différentes pour le taux de sélection (paramètre ``fraction`` dans ``Dataset.sample()``) : .. code-block:: scala // Combien de lignes de chaque classe contient spamDF (attention opération coûteuse) spamDF.groupBy("label").count().show() .. code-block:: scala // Définition d'échantillons (échantillonnage simple) // On ne garde que les colonnes "val0", "val1" et "label" val spamEch1 = spamDF.select("val0", "val1", "label").sample(false, 0.5) println(spamEch1.count()) val spamEch2 = spamDF.select("val0", "val1", "label").sample(false, 0.1) println(spamEch2.count()) .. code-block:: scala // Opérations possibles aussi mais coûteuses spamEch1.groupBy("label").count().show() spamEch2.groupBy("label").count().show() La réduction du *DataFrame* à 3 colonnes (avec ``.select("val0", "val1", "label")``) est simplement effectuée pour améliorer la lisibilité de certaines des opérations suivantes. .. admonition:: Question : Que pouvez-vous constater en comparant le nombre de lignes de ``spamEch1`` ou ``spamEch2`` et celui de ``spamDF`` multiplié par la valeur de ``fraction`` correspondante ? Refaites l'échantillonnage (avec les mêmes valeurs de taux de sélection) et comparez. Expliquez. .. ifconfig:: tpscala in ('public') .. admonition:: Correction : Il y a un écart important entre 4601 * 0.5 et le nombre de lignes de spamEch1, ainsi qu'entre 4601 * 0.1 et le nombre de lignes de spamEch2. De plus, si on refait l'échantillonnage on n'obtient pas nécessairement le même nombre de lignes. La valeur de ``fraction`` représente la probabilité de retenir une ligne du *DataFrame* de départ dans l'échantillon (même valeur valable pour toutes les lignes) et non un "taux de sélection" dans le sens nb_lignes_DataFrame_de_départ * taux_de_sélection = nb_lignes_DataFrame_échantillon. Examinons maintenant des statistiques simples calculées sur les colonnes des 3 *DataFrame* : .. code-block:: scala val summary = spamDF.select("val0", "val1", "label").describe() val summary1 = spamEch1.describe() val summary2 = spamEch2.describe() summary.show() summary1.show() summary2.show() .. admonition:: Question : Quel constat faites-vous en comparant les moyennes calculées pour une même variable (colonne) sur ``spamDF.select("val0", "val1", "label")`` et sur les deux échantillons ? Expliquez. .. ifconfig:: tpscala in ('public') .. admonition:: Correction : Les estimations faites à partir d'un échantillon sont (naturellement) différentes des valeurs obtenues sur la totalité des données et l'erreur d'estimation augmente en général avec la réduction de la taille de l'échantillon. Échantillonnage stratifié ========================= L'échantillonnage stratifié permet de fixer le taux d'échantillonnage pour chaque strate. Pour un *DataFrame*, les strates sont définis par les valeurs uniques d'une des colonnes (une valeur définit un strate). La méthode ``.sampleBy()`` de ``DataFrameStatFunctions`` (voir la `documentation API Spark en Scala `_) respecte de façon approximative les taux d'échantillonnage indiqués. Il est **nécessaire** d'indiquer la valeur d'initialisation (*seed*) du générateur aléatoire (ci-dessous ``1L``). .. code-block:: scala // Définition des fractions par strate val fractions = Map(0.0 -> 0.5, 1.0 -> 0.5) // Map[K, Double] // Échantillonnage stratifié (approximatif) val spamEchStratifie = spamDF.select("val0", "val1", "label").stat.sampleBy("label", fractions, 1L) spamEchStratifie.groupBy("label").count().show() .. admonition:: Question : Refaites l'échantillonnage avec une autre valeur de *seed* et comparez les résultats. Que constatez-vous ? .. ifconfig:: tpscala in ('public') .. admonition:: Correction : Avec une autre valeur, les effectifs des strates dans l'échantillon sont assez sensiblement différents. Dans l'état actuel de développement de l'API *Dataframe*, l'échantillonnage stratifié plus précis (respectant de plus près les taux de sélection indiqués dans le ``Map()``) n'est pas directement accessible. Il est en revanche possible de passer pour cela par l'API RDD, dans laquelle on trouve une méthode ``sampleByKeyExact`` qui répond à cette attente (tout en étant plus coûteuse) : .. code-block:: scala import org.apache.spark.sql.Row import org.apache.spark.rdd.RDD import org.apache.spark.rdd.PairRDDFunctions import org.apache.spark.ml.linalg.SQLDataTypes.VectorType import org.apache.spark.ml.linalg.Vectors // Echantillonnage stratifié exact avec .sampleByKeyExact() en passant par un RDD val spamEchStratRDD = spamDF.select("val0", "val1", "label").rdd .map(l => (l.getAs[Double](2), Vectors.dense(l.getAs[Double](0),l.getAs[Double](1)))) .sampleByKeyExact(false, fractions).map(l => Row(l._1, l._2)) // Définition du schéma pour le Dataframe val schemaReduit = StructType(Seq(StructField("label", DoubleType, true), StructField("valeurs", VectorType, true))) // Reconversion en Dataframe val spamEchStratifieExact = spark.createDataFrame(spamEchStratRDD, schemaReduit) // Examen de l'échantillon spamEchStratifieExact.groupBy("label").count().show() .. val spamEchStratRDD = spamDF.select("val0", "val1", "label").rdd.map(l => (l.getAs[Double](2), Vectors.dense({ for (i <- 0 until 2) yield l.getAs[Double](i)}.toArray))).sampleByKeyExact(false, fractions).map(l => Row(l._1, l._2)) L'échantillon ``spamEchStratifieExact`` respecte mieux les taux de sélection demandés dans ``fractions`` mais est bien plus coûteux à obtenir que ``spamEchStratifie``. **************************************** Analyse en composantes principales (ACP) **************************************** MLlib donne la possibilité de calculer directement les *k* premières composantes principales pour un ensemble d'observations. Les *N* observations sont représentées par une ou plusieurs colonnes d'un *DataFrame* dont le nombre de lignes est égal au nombre d'observations (*N*). Le *DataFrame* peut contenir une colonne de type ``Vector`` avec autant de composantes qu'il y a de variables soumises à l'ACP (nombre noté ici par *n*), ou alors plusieurs colonnes qu'il faut **regrouper au préalable** pour obtenir une colonne de type ``Vector``. Les *k* premières composantes principales sont les vecteurs propres d'une ACP centrée mais non normée (les variables sont transformées pour être de moyenne nulle mais la variance n'est pas modifiée pour devenir égale à 1) appliquée aux observations. Les valeurs propres sont retournées triées par ordre décroissant et exprimées en proportion de variance expliquée dans un ``DenseVector`` accessible par ``PCAModel.explainedVariance``. Les composantes principales (vecteurs propres unitaires associés aux valeurs propres) sont retournées dans une matrice locale ``DenseMatrix`` de *n* lignes (le nombre de variables initiales) et *k* colonnes (le nombre de composantes principales demandées) accessible par ``PCAModel.pc``. Les composantes principales sont triées en ordre décroissant des valeurs propres correspondantes. L'ACP est réalisée en définissant une instance de *PCA* et en appelant la méthode ``.fit()`` pour cette instance. Le résultat est un *PCAModel*. La projection des données sur les *k* premières composantes principales est obtenue en appelant la méthode ``.transform()`` pour ce *PCAModel*. Dans la suite, nous appliquerons l'ACP sur les données de *Spambase Data Set*, ainsi que sur un échantillon de ces données et sur des données aléatoires générées suivant une loi normale multidimensionnelle isotrope (c'est à dire de de même variance dans toutes les directions). Nous visualiserons les résultats. Réaliser l'ACP ============== ACP sur le Spambase Data Set ---------------------------- La première opération consiste à regrouper les colonnes à analyser du *DataFrame* ``spamDF`` dans une nouvelle colonne de type ``Vector`` en utilisant le ``VectorAssembler`` : .. code-block:: scala import org.apache.spark.ml.feature.VectorAssembler // Construction du VectorAssembler val colsEntree = {for (i <- 0 until 57) yield "val"+i.toString}.toArray val assembleur = new VectorAssembler().setInputCols(colsEntree).setOutputCol("features") // Construction du Dataframe spamDFA en appliquant le VectorAssembler val spamDFA = assembleur.transform(spamDF) spamDFA.printSchema() spamDFA.select("features").show(3) La colonne *features* contient désormais un vecteur (type ``Vector``) qui rassemble les 57 valeurs des colonnes initiales. Nous pouvons désormais appliquer l'ACP sur les données de ``spamDFA.select("features")``. Par défaut, Spark applique une ACP centrée mais non normée : .. code-block:: scala import org.apache.spark.ml.feature.PCA // Construction et application d'une nouvelle instance d'estimateur PCA val pca = new PCA().setInputCol("features").setOutputCol("pcaFeatures") .setK(3).fit(spamDFA) // Application du « transformateur » PCAModel résultant de l'estimation val resultat = pca.transform(spamDFA).select("pcaFeatures") // Les 3 plus grandes valeurs propres (exprimées en proportion de variance expliquée) println(pca.explainedVariance) // Vecteurs propres correspondants println(pca.pc) Afin d'obtenir une ACP centrée et normée, il est nécessaire de normaliser les variables (les transformer pour que chaque variable soit de moyenne nulle et écart-type égal à 1) avant application de l'ACP. Pour cela, il est possible de faire appel à ``StandardScaler`` : .. code-block:: scala import org.apache.spark.ml.feature.StandardScaler // Construction d'une nouvelle instance d'estimateur StandardScaler val scaler = new StandardScaler().setInputCol("features").setOutputCol("scaledFeatures") .setWithStd(true).setWithMean(true) // Application de la nouvelle instance d'estimateur StandardScaler val scalerModel = scaler.fit(spamDFA) La variable ``scalerModel`` contient désormais le modèle de normalisation adapté au jeu de données ``spamDFA``. Il est possible d'inspecter les statistiques qui ont été estimées sur le ``DataFrame`` : .. code-block:: scala // Examen des moyennes et des variances calculées dans le StandardScalerModel résultant println(scalerModel.mean) println(scalerModel.std) Une fois le modèle obtenu, il peut être appliqué sur n'importe quel jeu de données afin de le normaliser. Dans notre cas, nous l'utilisons pour normaliser (centrage et réduction) les éléments du jeu de données ``spamDFA`` : .. code-block:: scala // Application du transformer StandardScalerModel résultant val scaledSpamDF = scalerModel.transform(spamDFA).select("scaledFeatures", "label") // scaledSpamDF a une seule colonne de Vectors, nommée "scaledFeatures" scaledSpamDF.printSchema .. admonition:: Question : Appliquez l'ACP sur le *DataFrame* ``scaledSpamDF`` et mettez les projections des données sur les trois premiers axes principaux dans un *DataFrame* ``resultatCR``. Examinez les trois plus grandes valeurs propres et comparez-les à celles obtenues précédemment. Que constatez-vous ? .. ifconfig:: tpscala in ('private') .. only:: jupyter .. code-block:: scala .. ifconfig:: tpscala in ('public') .. admonition:: Correction .. code-block:: scala // Construction et application d'une nouvelle instance d'estimateur PCA val pcaCR = new PCA().setInputCol("scaledFeatures").setOutputCol("pcaCRFeatures") .setK(3).fit(scaledSpamDF) // Application du « transformateur » PCAModel résultant de l'estimation val resultatCR = pcaCR.transform(scaledSpamDF).select("pcaCRFeatures") // Les 3 plus grandes valeurs propres (exprimées en proportion de variance expliquée) pcaCR.explainedVariance Les écarts entre les 3 plus grandes valeurs propres sont plus faibles sur les données centrées et réduites que sur les données seulement centrées. Cela montre que les variables initiales avaient des variances très différentes (après centrage). .. admonition:: Question : Appliquez l'ACP non normée sur un échantillon (appelé ``spamEch3``) de 10% du *DataFrame* ``spamDF`` obtenu par échantillonnage simple et mettez les projections des données sur les trois premiers axes principaux dans un *DataFrame* ``resultatEch3``. Examinez les trois plus grandes valeurs propres et comparez-les à celles obtenues dans l'ACP non normée de ``spamDF``. Que constatez-vous ? .. ifconfig:: tpscala in ('private') .. only:: jupyter .. code-block:: scala .. ifconfig:: tpscala in ('public') .. admonition:: Correction .. code-block:: scala val spamEch3 = spamDFA.sample(false, 0.1) val pcaEch3 = new PCA().setInputCol("features").setOutputCol("pcaFeatures") .setK(3).fit(spamEch3) val resultatEch3 = pcaEch3.transform(spamEch3).select("pcaFeatures") pcaEch3.explainedVariance Les valeurs propres obtenues sur un échantillon d'environ 10% sont naturellement différentes de celles obtenues sur la totalité des données, les écarts (relatifs) étant toutefois plus faible pour les valeurs propres les plus grandes que pour les suivantes. ACP sur des données aléatoires ------------------------------ À titre d'exemple, nous pouvons également appliquer l'ACP sur des données tridimensionnelles obtenues par tirage aléatoire suivant une loi normale isotrope : .. code-block:: scala // Création d'un Dataframe d'identifiants entre 0 et 4600 val idsDF = spark.range(4601) // Création d'un dataframe avec 3 colonnes de valeurs aléatoires // tirées d'une loi normale d'espérance nulle et écart-type 1 val randnDF = idsDF.select($"id", randn(seed=1).alias("normale0"), randn(seed=2).alias("normale1"), randn(seed=3).alias("normale2")) // Vérification randnDF.describe().show() // Construction du VectorAssembler val colsEntRnd = {for (i <- 0 until 3) yield "normale"+i.toString}.toArray val assRnd = new VectorAssembler().setInputCols(colsEntRnd).setOutputCol("features") // Construction du Dataframe randnDF2 en appliquant le VectorAssembler val randnDF2 = assRnd.transform(randnDF) randnDF2.printSchema() // Application de l'ACP val pcaRnd = new PCA().setInputCol("features").setOutputCol("pcaRndFeatures") .setK(3).fit(randnDF2) val resRnd = pcaRnd.transform(randnDF2).select("pcaRndFeatures") pcaRnd.explainedVariance .. admonition:: Question : Examinez les trois premières valeurs propres et expliquez le résultat obtenu. .. ifconfig:: tpscala in ('public') .. admonition:: Correction : Les valeurs propres obtenues sur les données aléatoires suivant une loi normale isotrope sont (pratiquement) égales, le nuage de points est presque sphérique ; les vecteurs propres ne sont pas utiles dans ce cas car une faible perturbation dans l'échantillon aléatoire suivant la loi normale isotrope aura comme conséquence un changement potentiellement fort dans les vecteurs propres associés aux trois plus grandes valeurs propres. Approfondissement : visualiser les résultats avec Vegas ======================================================= Dans cette séance de travaux pratiques nous visualiserons les données à l'aide de la bibliothèque `Vegas pour Scala `_. Nous avons précédemment chargé celle-ci dans Zeppelin grâce à la commande "magique" ``%spark.dep`` (se référer à la première cellule du cahier Zeppelin). Nous souhaitons désormais produire un nuage de points en deux dimensions représentant chaque courriel projeté sur les deux axes principaux de l'ACP. .. admonition:: Question Réaliser une analyse en composantes centrée réduite sur ``scaledSpamDF`` mais en ne conservant cette fois-ci que les **deux** premiers axes principaux. Mettre le résultat dans une variable ``resultatCR_2D``. .. ifconfig:: tpscala in ('private') .. only:: jupyter .. code-block:: scala .. ifconfig:: tpscala in ('public') .. admonition:: Correction Il s'agit simplement d'appliquer sur les données centrées-réduites une ACP avec :math:`k = 2`. .. code-block:: scala val pcaCR = new PCA().setInputCol("scaledFeatures").setOutputCol("pcaCRFeatures") .setK(2).fit(scaledSpamDF) // Application du « transformateur » PCAModel résultant de l'estimation val resultatCR_2D = pcaCR.transform(scaledSpamDF).select("pcaCRFeatures") Afin de réaliser la visualisation avec Vegas, il est nécessaire de transformer les vecteurs ``features`` de l'ACP en deux colonnes séparées contenant les coordonnées en ``x`` et ``y``. .. code-block:: scala import org.apache.spark.ml.linalg.Vector import org.apache.spark.sql.functions.udf // Extrait le premier scalaire d'un vecteur et le place dans une colonne val first = udf((v: Vector) => v.toArray(0)) // Extrait le deuxième scalaire d'un vecteur et le place dans une colonne val second = udf((v: Vector) => v.toArray(1)) val coordinates = resultatCR_2D.withColumn("x", first($"pcaCRFeatures")).withColumn("y", second($"pcaCRFeatures")) coordinates.printSchema() Le ``DataFrame`` résultant contient maintenant deux nouvelles colonnes ``x`` et ``y`` qui contiennent les coordonnées de chaque élément du ``DataFrame`` initial ``spamDF`` projeté selon les deux axes principaux après ACP. Nous pouvons maintenant effectuer la visualisation avec Vegas. .. admonition:: Note Attention, la visualisation nécessite d'avoir au préalable importé la bibliothèque `vegas-viz `_. Cela peut se faire dynamiquement avec Zeppelin en utilisant la commande ``%spark.dep`` ou en ligne de commande avec spark-shell en utilisant l'option ``--packages "org.vegas-viz:vegas_2.11:0.3.11","org.vegas-viz:vegas-spark_2.11:0.3.11"``. Pour ce TP, nous supposerons que vous travaillez avec Zeppelin. Si ce n'est pas le cas, vous devrez adapter la variable ``render`` pour utiliser le `moteur de rendu adéquat `_. .. code-block:: scala implicit val render = vegas.render.ShowHTML(s => print("%html " + s)) import vegas._ import vegas.data.External._ import vegas.sparkExt._ Vegas("PCA 2D").withDataFrame(coordinates).mark(Point).encodeX("x", Quant).encodeY("y", Quant).show L'utilisation de Vegas se fait de la façon suivante : ``withDataFrame`` permet de spécifier que l'on veut visualiser le contenu d'un ``DataFrame`` spécifique, ``mark`` indique que nos observations seront identifiées par des points et ``.encodeX/encodeY`` précisent quelles colonnes correspondent aux coordonnées des points. Les paramètres ``Quant`` signifie que les colonnes ``x`` et ``y`` représentent des variables **quantitatives**. Les variables nominatives sont spécifiées par ``Nom``. .. admonition:: Question Réaliser la même visualisation dans le cas des données aléatoires. Que constatez-vous ? .. ifconfig:: tpscala in ('private') .. only:: jupyter .. code-block:: scala .. ifconfig:: tpscala in ('public') .. admonition:: Correction : .. code-block:: scala val output = resRnd.withColumn("x", first($"pcaRndFeatures")).withColumn("y", second($"pcaRndFeatures")) Vegas("PCA 2D").withDataFrame(output).mark(Point).encodeX("x", Quant).encodeY("y", Quant).show Dans le cas des données aléatoires, l'analyse en composantes principales ne permet pas de déterminer de direction principale car la distribution des données est isotrope. Par conséquent, la visualisation obtenue est simplement une boule bidimensionnelle. .. admonition:: Question Pour terminer, effectuez cette visualisation mais en ajoutant une couleur différente en fonction de l'étiquette du point considéré. .. ifconfig:: tpscala in ('private') .. only:: jupyter .. code-block:: scala .. ifconfig:: tpscala in ('public') .. admonition:: Correction Pour visualiser les étiquettes, il faut conserver la colonne ``label`` après l'ACP : .. code-block:: scala val resultatCR_2D = pcaCR.transform(scaledSpamDF).select("pcaCRFeatures", "label") On peut ensuite ajouter l'option ``.encodeColor("label", Nom)`` à la visualisation précédente. .. code-block:: scala val output = resultatCR_2D.withColumn("x", first($"pcaCRFeatures")).withColumn("y", second($"pcaCRFeatures")) Vegas("PCA 2D").withDataFrame(output).mark(Point).encodeX("x", Quant).encodeY("y", Quant).encodeColor("label", Nom).show