.. _chap-tpClassificationAutomatique: ############################################################# Travaux pratiques - Classification automatique avec *k-means* ############################################################# .. only:: html .. container:: notebook .. image:: _static/zeppelin_classic_logo.png :class: svg-inline `Cahier Zeppelin `_ (`la version précédente de cette séance, utilisant l'API RDD `_) Références externes utiles : * `Documentation Spark `_ * `Documentation API Spark en Scala `_ * `Documentation Scala `_ **L'objectif** de cette séance de TP est de présenter l'utilisation dans Spark de la méthode *k-means* de classification automatique (avec une initialisation par *k-means||*), ainsi que la définition et l'utilisation de *pipelines* (enchaînements de traitements successifs) dans Spark. Nous travaillerons lors de cette séance de travaux pratiques d'abord sur des données générées pendant la séance. Vous aurez à traiter ensuite les données `Spambase Data Set issues de l'archive de l'UCI `_. Nous vous incitons à **regarder sur le site de l'UCI** des explications plus détaillées concernant ces données, notamment la **signification des variables** et les classes auxquelles appartiennent les données. .. only:: jupyter .. code-block:: scala %spark.dep z.load("org.vegas-viz:vegas_2.11:0.3.11") z.load("org.vegas-viz:vegas-spark_2.11:0.3.11") .. only:: html .. code-block:: bash $ spark-shell Si la commande ``spark-shell`` n'est pas trouvée, entrez d'abord ``export PATH="$PATH:/opt/spark/bin"`` et ensuite ``spark-shell``. Nous aurons besoin de Vegas par la suite, pensez à charger cette dépendance dans Spark avant de commencer. Génération des données ====================== Nous nous servirons de ``KMeansDataGenerator.generateKMeansRDD`` pour générer un jeu de données synthétique sur lequel nous appliquerons le *clustering*. Ces données sont produites d'abord sous la forme d'un *RDD* qui est ensuite transformé en *DataFrame*. Cette méthode choisit d'abord *k* centres de groupes à partir d'une loi normale :math:`d`-dimensionnelle, d'écart-type :math:`r` et ensuite crée autour de chaque centre un groupe à partir d'une loi normale :math:`d`-dimensionnelle d'écart-type 1. Les paramètres d'appel sont : * ``sc`` : le ``SparkContext`` employé (ici, celui par défaut de ``spark-shell``) ; * ``numPoints`` : le nombre total de données générées ; * ``k`` : le nombre de centres (donc de groupes dans les données générées) ; * ``d`` : la dimension des données ; * ``r`` : écart-type pour la loi normale qui génère les centres ; * ``numPartitions`` : nombre de partitions pour le RDD généré (2 par défaut, ici nous utiliserons 1 seule). (se référer à la `documentation `_ pour plus de détails) Nous générerons 1000 données bidimensionnelles dans 5 groupes en utilisant : .. code-block:: scala import org.apache.spark.sql.Row import org.apache.spark.ml.linalg.Vectors import org.apache.spark.mllib.util.KMeansDataGenerator // Générer les données val donneesGenerees = KMeansDataGenerator.generateKMeansRDD(sc, 1000, 5, 2, 5, 1) .map(l => Vectors.dense(l)) .map(v => Row(v)) donneesGenerees.take(2) Les données obtenues sont bien des vecteurs à deux coordonnées. Nous pouvons maintenant transformer ce RDD en DataFrame contenant une seule colonne ``features`` de type ``Vector`` : .. code-block:: scala // Construction d'un DataFrame à partir du RDD import org.apache.spark.sql.types._ import org.apache.spark.ml.linalg.SQLDataTypes.VectorType val schemaVecteurs = StructType(Seq(StructField("features", VectorType, true))) val vecteursGroupesDF = spark.createDataFrame(donneesGenerees, schemaVecteurs).cache() vecteursGroupesDF.show(false) Nous pouvons visualiser les points comme dans les TP précédents en s'appuyant sur Vegas : .. code-block:: scala import org.apache.spark.ml.linalg.Vector import org.apache.spark.sql.functions.udf // Extrait le premier scalaire d'un vecteur et le place dans une colonne val first = udf((v: Vector) => v.toArray(0)) // Extrait le deuxième scalaire d'un vecteur et le place dans une colonne val second = udf((v: Vector) => v.toArray(1)) La visualisation se fait de la façon suivante : .. code-block:: scala // Import des bibliothèques de Vegas implicit val render = vegas.render.ShowHTML(s => print("%html " + s)) import vegas._ import vegas.data.External._ import vegas.sparkExt._ // Construction du nuage de points val points = vecteursGroupesDF.withColumn("x", first($"features")).withColumn("y", second($"features")) Vegas("Données initiales").withDataFrame(points).mark(Point).encodeX("x", Quant).encodeY("y", Quant).show Nous pouvons vérifier que les données ont bien été générées aléatoirement par cinq gaussiennes modérément bien séparées les unes des autres. *K-means* avec initialisation par K-means|| =========================================== La *MLlib* de Spark propose une implémentation de l'algorithme de classification automatique *k-means*. L'initialisation des centres est réalisée par l'algorithme parallèle `k-means|| `_, vu aussi en cours, avec 5 itérations (``initSteps: 5``, peut être modifié avec ``KMeans.setInitSteps``). La classification est obtenue en appelant ``KMeans.fit``. Différents paramètres peuvent être définis avec les méthodes suivantes (à appeler pour l'instance de ``KMeans`` créée) : * ``setFeaturesCol(value: String)`` : nom de la colonne qui contient les données à classifier, sous forme de vecteurs (par défaut ``"features"``) ; * ``setK(value: Int)`` : nombre de groupes ou *clusters* à obtenir (par défaut 2) ; * ``setMaxIter(value: Int)`` : nombre maximal d'itérations ; arrêt de l'exécution lorsque les centres sont stabilisés (paramètre ``tol``, voir ``setTol`` ci-dessous) ou ``maxIter`` est atteint ; * ``setPredictionCol(value: String)`` : nom de la colonne qui contient les « prédictions », c'est à dire le numéro de groupe pour chaque observation (par défaut ``"prediction"``) ; * ``setSeed(value: long)`` : valeur d'initialisation du générateur de nombres aléatoires (la préciser permet de reproduire les résultats ultérieurement) ; * ``setTol(value: Double)`` : définition de la valeur de la tolérance (``tol``) pour l'évaluation de la convergence de l'algorithme ; * ``setInitMode(value: String)`` : méthode d'initialisation, soit ``"random"``, soit ``"k-means||"`` (par défaut ``"k-means||"``) ; * ``setInitSteps(value: Int)`` : nombre de pas pour l'initialisation par ``k-means||`` (2 par défaut). Se référer à la `documentation `_ pour plus de détails. Voici un exemple de code Scala pour Spark permettant de réaliser la classification automatique (*clustering*) avec KMeans. Commençons par appliquer k-means à notre *DataFrame* : .. code-block:: scala import org.apache.spark.ml.clustering.KMeans // Appliquer k-means val kmeans = new KMeans().setK(5).setMaxIter(200).setSeed(1L) val modele = kmeans.fit(vecteursGroupesDF) Une fois ceci fait, nous pouvons calculer l'inertie globale de cette classification (cela permet de comparer deux classifications différentes pour choisir la meilleure de :math:`k`, par exemple) et afficher les coordonnées des centres des groupes trouvés. .. code-block:: scala // Évaluer la classification par la somme des inerties intra-classe val wsse = modele.computeCost(vecteursGroupesDF) // Afficher les centres des groupes modele.clusterCenters.foreach(println) Pour obtenir les indices des groupes et réaliser la classification à proprement parler, il suffit d'appliquer la méthode ``transform`` au *DataFrame* souhaité : .. code-block:: scala // Trouver l'indice de groupe pour chaque donnée val resultat = modele.transform(vecteursGroupesDF) resultat.show(5) Le *DataFrame* ``resultat`` contient alors une colonne ``prediction`` qui, pour chaque observation du jeu de données initial, correspond au numéro du groupe (*cluster*) dans lequel l'algorithme des k-moyennes l'a affectée. La visualisation avec Vegas se fait en ajoutant l'option ``encodeColor`` en utilisant la valeur de la colonne ``prediction`` : .. code-block:: scala val points = resultat.withColumn("x", first($"features")).withColumn("y", second($"features")) Vegas("K-Means").withDataFrame(points).mark(Point).encodeX("x", Quant).encodeY("y", Quant).encodeColor("prediction", Nom).show Questions ========= .. admonition:: Question : Réalisez une nouvelle fois la classification des mêmes données avec *k-means* en 5 groupes mais une valeur différente dans ``.setSeed()``. Visualisez à nouveau les résultats (mais en gardant les graphes initiaux ouverts pour pouvoir comparer). Que constatez-vous ? .. ifconfig:: tpscala in ('public') .. admonition:: Correction : En général les indices des groupes changent : un même groupe n'a pas la même couleur dans les deux fenêtres de visualisation. En revanche, les regroupements obtenus sont souvent les mêmes entre les deux exécutions de *k-means*. .. admonition:: Question : Réalisez deux fois la classification des mêmes données avec *k-means* en 5 groupes mais avec une initialisation ``random`` plutôt que ``k-means||`` et des valeurs différentes dans ``.setSeed()``. .. Visualisez les résultats en ouvrant à chaque fois une autre fenêtre terminal et en lançant ``gnuplot`` dans cette fenêtre (afin de conserver toutes les fenêtres de visualisation). Que constatez-vous ? .. ifconfig:: tpscala in ('private') .. only:: jupyter .. code-block:: scala .. ifconfig:: tpscala in ('public') .. admonition:: Correction : .. code-block:: scala val kmeansR = new KMeans().setK(5).setMaxIter(200).setInitMode("random").setSeed(1L) val modeleR = kmeansR.fit(vecteursGroupesDF) val wsseR = modeleR.computeCost(vecteursGroupesDF) Non seulement les indices des groupes changent, mais en plus les regroupements obtenus *peuvent être différents* entre les deux exécutions de *k-means* (même si cela ne se produit pas systématiquement, surtout si les groupes générés au départ sont bien séparés entre eux). Les résultats dépendent plus de l'initialisation lorsque celle-ci est aléatoire plutôt que réalisée avec ``k-means||``. .. admonition:: Question : Réalisez la classification des mêmes données avec *k-means* et une initialisation ``k-means||`` en 4 groupes et ensuite en 6 groupes. Visualisez à chaque fois les résultats. Que constatez-vous ? .. ifconfig:: tpscala in ('private') .. only:: jupyter .. code-block:: scala .. ifconfig:: tpscala in ('public') .. admonition:: Correction : Avec 4 groupes, les groupes précédents les plus proches sont fusionnés. Avec 6 groupes, un des groupes précédents est divisé en deux. De plus, en exécutant la classification plusieurs fois, on peut constater une relative dépendance des résultats de l'intialisation malgré l'utilisation de ``k-means||``. .. admonition:: Question (optionnelle) : Multipliez par 3 les données sur une des dimensions initiales en vous servant de `ElementwiseProduct `_. Réalisez la classification des nouvelles données avec *k-means* (et initialisation par ``k-means||``) en 5 groupes. Visualisez les résultats. Que constatez-vous ? .. ifconfig:: tpscala in ('public') .. admonition:: Correction : Nous nous servons de `ElementwiseProduct `_ : .. code-block:: scala import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.feature.ElementwiseProduct // On contracte le nuage d'observations initial par 5 selon l'axe x val amplifications = Vectors.dense(0.2, 1.0) val amplifieur = new ElementwiseProduct().setScalingVec(amplifications).setInputCol("features").setOutputCol("featuresMod") val vecteursGroupesModDF = amplifieur.transform(vecteursGroupesDF) val kmeansMod = new KMeans().setFeaturesCol("featuresMod").setK(5).setMaxIter(200).setSeed(1L) val modeleMod = kmeansMod.fit(vecteursGroupesModDF) val wsseMod = modeleMod.computeCost(vecteursGroupesModDF) val indicesMod = modeleMod.transform(vecteursGroupesModDF) On constate que les regroupements ne sont plus les mêmes (des exceptions sont néanmoins possibles), car la distance entre les données est modifiée par la « dilatation » importante d'une des variables. .. admonition:: Question (optionnelle) : Réalisez la classification des données `Spambase Data Set issues de l'archive de l'UCI `_. Comment pré-traiter les données et pourquoi ? Combien de groupes rechercher ? Visualisez ensuite leurs projections sur des groupes de 3 variables. .. ifconfig:: tpscala in ('private') .. only:: jupyter .. code-block:: scala .. ifconfig:: tpscala in ('public') .. admonition:: Correction : Les variables ne sont pas de variances comparables ; sans centrage et réduction, les variables de variance très élevée auront un impact déterminant sur les résultats de la classification automatique. Il est donc envisageable de centrer et réduire d'abord les variables. Les données correspondent à 2 classes, « spam » et « non spam », il donc est envisageable de faire d'abord une classification automatique en 2 groupes. Il est ensuite possible de rechercher plus de 2 groupes car dans chaque classe les données peuvent former plusieurs groupes distincts. La visualisation des projections sur des groupes de 3 variables peut être peu informative car au départ il y a 57 variables. Le `TP optionnel sur l'analyse factorielle discriminante `_ montre comment il est possible d'obtenir une visualisation en 3D dans laquelle la séparation entre groupes est optimisée. Les flux de travail (*ML pipelines*) ==================================== Spark permet de définir des flux de travail (ou *workflows*, ou `ML pipelines `_) qui enchaînent différentes étapes de traitement des données se trouvant dans des *DataFrames*. Les composants des *pipelines* sont les *transformers* (transformateurs) et les *estimators* (estimateurs). Un *transformer* est un algorithme qui transforme un *DataFrame* en un autre *DataFrame*, en général par ajout d'une ou plusieurs colonnes. La transformation peut être, par exemple, le centrage et la réduction des variables numériques (transformer chaque variable pour qu'elle soit de moyenne nulle et écart-type égal à 1) qui décrivent les observations ou la transformation de textes en vecteurs. Chaque *transformer* possède une méthode ``.transform()`` qui, en général, crée une ou plusieurs colonnes et les ajoute au *DataFrame* reçu en entrée. Un *estimator* est un algorithme qui permet de construire un **modèle** à partir de données d'un *DataFrame*. Une fois construit, ce modèle sera un *transformer*. Chaque *estimator* possède une méthode ``.fit()``. Par exemple, pour centrer et réduire les variables numériques décrivant des observations il est nécessaire d'utiliser un *estimator* qui détermine les moyennes et les écart-types à partir des données ; ces moyennes et écart-types constituent le modèle qui, appliqué comme un *transformer*, permet ensuite d'obtenir le centratge et la réduction pour les données sur lesquelles il a été obtenu (avec ``.fit()``) mais aussi sur d'autres données (par exemple, de nouvelles observations concernant le même problème). Nous avons déjà vu la construction d'un modèle avec ``.fit()`` et son utilisation ultérieure avec ``.transform()`` dans le TP précédent `lorsque nous avons centré et réduit les variables avant application de l'ACP `_. Dans cette séance de TP nous examinerons la construction d'un modèle descriptif par classification automatique, réalisée par un *estimator*. Le modèle résultant est un *transformer* qui, appliqué aux données d'un *DataFrame*, fournit pour chaque observation le numéro du centre de groupe le plus proche (donc le numéro du groupe auquel cette observation est « affectée »). Les *transformers* et les *estimators* emploient une interface commune *parameter* pour spécifier les paramètres utilisés. Un *pipeline* permet d'enchaîner des composants qui, au départ, peuvent être des *estimators* et, après estimation, sont employés comme des *transformers*. Le *pipeline* dispose ainsi #. d'une méthode ``.fit()`` qui permet d'enchaîner les opérations d'estimation pour tous les *estimators* composant le *pipeline*, #. ainsi que d'une méthode ``.transform()`` qui enchaîne les traitements des données par les différents modèles estimés, devenus des *transformers*. Dans la documentation de Spark, ce processus est illustré dans `cet exemple `_. Nous aborderons un autre exemple de *pipeline* dans la suite de cette séance. Construction de *pipeline* simple ================================= Afin de centrer et réduire les données *Spambase Data Set* et d'appliquer ensuite la classification automatique, construisons un *pipeline* qui regroupe ces deux opérations : .. code-block:: scala import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.ml.feature.StandardScaler import org.apache.spark.ml.clustering.KMeans import org.apache.spark.ml.feature.VectorAssembler // Construction de DataFrame avec les données de Spambase def genSpamFields(from: Int, to: Int) = for (i <- from until to) yield StructField("val"+i.toString, DoubleType, true) val spamFields = genSpamFields(0, 57) val spamSchema = StructType(spamFields).add("label", DoubleType, true) val spamDF = spark.read.format("csv").schema(spamSchema).load("tpacp/data/spambase.data") val colsEntree = {for (i <- 0 until 57) yield "val"+i.toString}.toArray val assembleur = new VectorAssembler().setInputCols(colsEntree).setOutputCol("features") val spamDFA = assembleur.transform(spamDF).cache() // Partitionnement en données de train (80%) et données de test (20%) val spamSplits = spamDFA.randomSplit(Array(0.8, 0.2)) // Construction d'un pipeline val scaler = new StandardScaler().setInputCol("features") .setOutputCol("scaledFeatures") .setWithStd(true) .setWithMean(true) val kmeansNS = new KMeans().setFeaturesCol(scaler.getOutputCol) .setPredictionCol("predictionNS") .setK(5) .setMaxIter(200) .setSeed(1L) val pipeline = new Pipeline().setStages(Array(scaler, kmeansNS)) L'objet ``pipeline`` contient désormais deux opérations : la normalisation (``scaler``) et la classification (``kmeansNS``). Ces deux opérations (*stages*) seront appliquées successivement sur les données passées aux méthodes ``fit`` et ``transform`` du *pipeline*. Par exemple, nous pouvons réaliser l'apprentissage du modèle *KMeans* avec normalisation de la façon suivante : .. code-block:: scala // Estimation des modèles du pipeline (sur données *train*): // le premier modèle (centrage et réduction) est estimé en premier, // ensuite il est utilisé comme transformer pour modifier les données // et c'est le second modèle qui est estimé (clustering) // val modeleKMNS = pipeline.fit(spamSplits(0)) Et l'ensemle peut s'appliquer en inférence sur de nouvelles données comme suit : .. code-block:: scala // Application du modèle pour prédire les groupes des données de test // val indicesSpamTest = modeleKMNS.transform(spamSplits(1)).select("scaledFeatures","predictionNS") Un *pipeline* (ou n'importe lequel de ses sous-modèles) peut être sauvegardé pour être réutilisé plus tard : .. code-block:: scala // Un pipeline peut être sauvegardé (avant estimation) pipeline.write.overwrite().save("spark-pipeline-normEtKMeans") // Un modèle issu d'un pipeline (après estimation) peut être sauvegardé modeleKMNS.write.overwrite().save("spark-modele-clustering-spam") // Un modèle sauvegardé peut être chargé pour être employé val memeModele = PipelineModel.load("spark-modele-clustering-spam") .. admonition:: Question Réaliser la visualisation avec Vegas de la classification automatique sur le jeu de données Spambase. Cette visualisation est faite sur les deux premières dimensions. Quelles que soient les deux dimensions choisies (parmi les 57 de départ) pour visualiser les groupes, elles seront probablement peu représentatives de cette séparation en groupes. Quelle méthode employer afin d'améliorer la visualisation ?