Travaux pratiques - Classification automatique avec k-means

(la version précédente de cette séance, utilisant l’API RDD)

Références externes utiles :

Nous travaillerons lors de cette séance sur des données générées lors de la séance de travaux pratiques. Vous aurez à traiter ensuite les données Spambase Data Set issues de l’archive de l’UCI. Nous vous incitons à regarder sur le site de l’UCI des explications plus détaillées concernant ces données, notamment la signification des variables et les classes auxquelles appartiennent les données.

Ouvrez une fenêtre terminal et entrez

$ cd ~/.
$ mkdir -p tpkmeans/data
$ cd tpkmeans
$ spark-shell

Si la commande spark-shell n’est pas trouvée, entrez d’abord export PATH="$PATH:/opt/spark/bin" et ensuite spark-shell.

Les flux de travail (ML pipelines)

Spark permet de définir des flux de travail (ou workflows, ou ML pipelines) qui enchaînent différentes étapes de traitement des données se trouvant dans des DataFrames. Les composants des pipelines sont les transformers (transformateurs) et les estimators (estimateurs).

Un transformer est un algorithme qui transforme un DataFrame en un autre DataFrame, en général par ajout d’une ou plusieurs colonnes. La transformation peut être, par exemple, la normalisation des variables numériques qui décrivent les observations ou la transformation de textes en vecteurs. Chaque transformer possède une méthode .transform() qui, en général, crée une ou plusieurs colonnes et les ajoute au DataFrame reçu en entrée.

Un estimator est un algorithme qui permet de construire un modèle à partir de données d’un DataFrame. Une fois construit, ce modèle sera un transformer. Chaque estimator possède une méthode .fit(). Par exemple, pour normaliser les variables numériques décrivant des observations il est nécessaire d’utiliser un estimator qui détermine les moyennes et les écart-types à partir des données ; ces moyennes et écart-types constituent le modèle qui, appliqué comme un transformer, permet ensuite d’obtenir la normalisation pour les données sur lesquelles il a été obtenu (avec .fit()) mais aussi sur d’autres données (par exemple, de nouvelles observations concernant le même problème). Nous avons déjà vu la construction d’un modèle avec .fit() et son utilisation ultérieure avec .transform() dans le TP précédent lorsque nous avons normalisé les données avant application de l’ACP. Dans cette séance de TP nous examinerons la construction d’un modèle descriptif par classification automatique, réalisée par un estimator. Le modèle résultant est un transformer qui, appliqué aux données d’un DataFrame, fournit pour chaque observation le numéro du centre de groupe le plus proche (dont le numéro du groupe auquel cette observation est « affectée »).

Les transformers et les estimators emploient une interface commune parameter pour spécifier les paramètres utilisés.

Un pipeline permet d’enchaîner des composants qui, au départ, peuvent être des estimators et, après estimation, sont employés comme des transformers. Le pipeline dispose ainsi

  1. d’une méthode .fit() qui permet d’enchaîner les opérations d’estimation pour tous les estimators composant le pipeline,
  2. ainsi que d’une méthode .transform() qui enchaîne les traitements des données par les différents modèles estimés, devenus des transformers.

Ce processus est illustré dans cet exemple. Nous aborderons un autre exemple de pipeline à la fin de cette séance de TP.

Génération des données

Nous nous servirons de KMeansDataGenerator.generateKMeansRDD pour obtenir les données de travail sous la forme d’un RDD qui est ensuite transformé en DataFrame. Cette méthode choisit d’abord k centres de groupes à partir d’une loi normale \(d\)-dimensionnelle, d’écart-type \(r\) et ensuite crée autour de chaque centre un groupe à partir d’une loi normale \(d\)-dimensionnelle d’écart-type 1. Les paramètres d’appel sont :

  • sc : le SparkContext employé (ici, celui par défaut de spark-shell) ;
  • numPoints : le nombre total de données générées ;
  • k : le nombre de centres (donc de groupes dans les données générées) ;
  • d : la dimension des données ;
  • r : écart-type pour la loi normale qui génère les centres ;
  • numPartitions : nombre de partitions pour le RDD généré (2 par défaut, ici nous utiliserons 1 seule).

Nous générerons 1000 données tridimensionnelles dans 5 groupes en utilisant :

scala> import org.apache.spark.sql.Row
scala> import org.apache.spark.ml.linalg.Vectors
scala> import org.apache.spark.mllib.util.KMeansDataGenerator

// Générer les données
scala> val donneesGenerees = KMeansDataGenerator.generateKMeansRDD(sc, 1000, 5, 3, 5, 1)
                                                .map(l => Vectors.dense(l))
                                                .map(v => Row(v))
scala> donneesGenerees.take(2)

// Construction d'un DataFrame à partir du RDD
scala> import org.apache.spark.sql.types._
scala> import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
scala> val schemaVecteurs = StructType(Seq(StructField("features", VectorType, true)))
scala> val vecteursGroupesDF = spark.createDataFrame(donneesGenerees, schemaVecteurs).cache()
scala> vecteursGroupesDF.show(false)

// Enregistrer les données dans un fichier texte
scala> vecteursGroupesDF.map(v => v.toString.filter(c => c != '[' & c != ']'))
                        .write.text("data/vecteursGroupes")

Pour un DataFrame plus grand, afin d’obtenir un seul fichier de sortie il est nécessaire d’employer .repartition(1) avant l’écriture : dataframe.repartition(1).map(v => v.toString.filter(...).write.text("...").

Renommez le fichier de sortie ~/tpkmeans/data/vecteursGroupes/part-00000-... en ~/tpkmeans/data/vecteursGroupes/vecteurs

K-means avec initialisation par K-means||

Spark MLlib propose une implémentation de l’algorithme de classification automatique k-means. L’initialisation des centres est réalisée par l’algorithme parallèle k-means||, vu aussi en cours, avec 5 itérations (initializationSteps: 5, peut être modifié avec KMeans.setInitializationSteps).

La classification est obtenue en appelant KMeans.fit. Différents paramètres peuvent être définis avec les méthodes suivantes (à appeler pour l’instance de KMeans créée) :
  • setFeaturesCol(value: String) : nom de la colonne qui contient les données à classifier, sous forme de vecteurs (par défaut "features") ;
  • setK(value: Int) : nombre de groupes ou clusters à obtenir (par défaut 2) ;
  • setMaxIter(value: Int) : nombre maximal d’itérations ; arrêt de l’exécution lorsque les centres sont stabilisés (paramètre tol, voir setTol ci-dessous) ou maxIter est atteint ;
  • setPredictionCol(value: String) : nom de la colonne qui contient les « prédictions », c’est à dire le numéro de groupe pour chaque observation (par défaut "prediction") ;
  • setSeed(value: long) : valeur d’initialisation du générateur de nombres aléatoires (la préciser permet de reproduire les résultats ultérieurement) ;
  • setTol(value: Double) : définition de la valeur de la tolérance (tol) pour l’évaluation de la convergence de l’algorithme ;
  • setInitMode(value: String) : méthode d’initialisation, soit "random", soit "k-means||" (par défaut "k-means||") ;
  • setInitSteps(value: Int) : nombre de pas pour l’initialisation par k-means|| (2 par défaut).

Pour réaliser la classification automatique entrez dans spark-shell les commandes suivantes :

scala> import org.apache.spark.ml.clustering.KMeans

// Appliquer k-means
scala> val kmeans = new KMeans().setK(5).setMaxIter(200).setSeed(1L)
scala> val modele = kmeans.fit(vecteursGroupesDF)

// Evaluer la classification par la somme des inerties intra-classe
scala> val wsse = modele.computeCost(vecteursGroupesDF)

// Afficher les centres des groupes
scala> modele.clusterCenters.foreach(println)

// Trouver l'indice de groupe pour chaque donnée
scala> val indices = modele.transform(vecteursGroupesDF)

// Enregistrer les indices dans un fichier texte
scala> indices.select("prediction")
              .map(p => p.toString.filter(c => c != '[' & c != ']'))
              .write.text("data/indices")

Renommez le fichier de sortie ~/tpkmeans/data/indices/part-00000-... en ~/tpkmeans/data/indices/idGroupes

Visualisation des résultats

Dans cette séance nous visualiserons les données à l’aide de gnuplot, déjà installé dans la salle de TP. Si vous travaillez sur votre ordinateur vous pouvez vous servir d’autres outils (comme matplotlib ou ggplot2) si vous les maîtrisez déjà ; il vous faudra toutefois les installer.

Pour préparer la visualisation, il est nécessaire de concaténer les lignes du fichier de données et les lignes du fichier d’indices. Nous le ferons directement dans une fenêtre terminal :

$ cd ~/tpkmeans/data
$ paste --delimiters="," vecteursGroupes/vecteurs indices/idGroupes > donneesGnuplot.txt

Nous utiliserons cette fois gnuplot à l’aide de commandes en ligne. Pour cela, ouvrez une nouvelle fenêtre terminal, placez-vous dans le répertoire ~/tpkmeans et lancez gnuplot :

$ cd ~/tpkmeans
$ gnuplot
gnuplot>

Entrez ensuite les commandes suivantes dans la fenêtre gnuplot (après l’invite gnuplot>) :

gnuplot> set datafile separator ','
gnuplot> set palette defined ( 0 "red", 1 "orange", 2 "brown", 3 "green", 4 "blue" )
gnuplot> splot "data/donneesGnuplot.txt" using 1:2:3:4 with points lc palette

Nous avons d’abord indiqué à gnuplot que le séparateur entre données d’une même ligne était la virgule (et non l’espace, séparateur par défaut), ensuite nous avons défini une palette permettant de donner à chaque point une couleur qui indique le groupe auquel il appartient et enfin nous avons appelé la fonction splot en lui indiquant que les données tridimensionnelles à afficher étaient suivies d’une quatrième dimension correspondant à la couleur du point.

Vous pouvez faire tourner le graphique en maintenant appuyé le bouton gauche de la souris et en déplaçant le pointeur dans la fenêtre graphique.

Questions

Question :

Réalisez une nouvelle fois la classification des mêmes données avec k-means en 5 groupes mais une valeur différente dans .setSeed(). Visualisez les résultats en ouvrant une autre fenêtre terminal et en lançant gnuplot dans cette fenêtre (afin de conserver la première fenêtre de visualisation). Que constatez-vous ?

Correction :

En général les indices des groupes changent : un même groupe n’a pas la même couleur dans les deux fenêtres de visualisation. En revanche, les regroupements obtenus sont les mêmes entre les deux exécutions de k-means.

Question :

Réalisez deux fois la classification des mêmes données avec k-means en 5 groupes mais avec une initialisation random plutôt que k-means|| et des valeurs différentes dans .setSeed(). Visualisez les résultats en ouvrant à chaque fois une autre fenêtre terminal et en lançant gnuplot dans cette fenêtre (afin de conserver toutes les fenêtres de visualisation). Que constatez-vous ?

Correction :

scala> val kmeansR = new KMeans().setK(5).setMaxIter(200)
                                .setInitMode("random").setSeed(1L)
scala> val modeleR = kmeansR.fit(vecteursGroupesDF)
scala> val wsseR = modeleR.computeCost(vecteursGroupesDF)

Non seulement les indices des groupes changent, mais en plus les regroupements obtenus peuvent être différents entre les deux exécutions de k-means (même si cela ne se produit pas systématiquement, surtout si les groupes générés au départ sont bien séparés entre eux). Les résultats dépendent plus de l’initialisation lorsque celle-ci est aléatoire plutôt que réalisée avec k-means||.

Question :

Réalisez la classification des mêmes données avec k-means et une initialisation k-means|| en 4 groupes et ensuite en 6 groupes. Visualisez à chaque fois les résultats. Que constatez-vous ?

Correction :

Avec 4 groupes, les groupes précédents les plus proches sont fusionnés. Avec 6 groupes, un des groupes précédents est divisé en deux. De plus, en exécutant la classification plusieurs fois, on peut constater une relative dépendance des résultats de l’intialisation malgré l’utilisation de k-means||.

Question :

Multipliez par 3 les données sur une des dimensions initiales en vous servant de ElementwiseProduct. Réalisez la classification des nouvelles données avec k-means (et initialisation par k-means||) en 5 groupes. Visualisez les résultats. Que constatez-vous ?

Correction :

Nous nous servons de ElementwiseProduct :

scala> import org.apache.spark.ml.linalg.Vectors
scala> import org.apache.spark.ml.feature.ElementwiseProduct

scala> val amplifications = Vectors.dense(3.0, 1.0, 1.0)
scala> val amplifieur = new ElementwiseProduct().setScalingVec(amplifications)
                           .setInputCol("features").setOutputCol("featuresMod")
scala> val vecteursGroupesModDF = amplifieur.transform(vecteursGroupesDF)

scala> vecteursGroupesModDF.select("featuresMod")
           .map(v => v.toString.filter(c => c != '[' & c != ']'))
           .write.text("data/vecteursGroupesMod")

scala> val kmeansMod = new KMeans().setFeaturesCol("featuresMod")
                                   .setK(5).setMaxIter(200).setSeed(1L)
scala> val modeleMod = kmeansMod.fit(vecteursGroupesModDF)
scala> val wsseMod = modeleMod.computeCost(vecteursGroupesModDF)
scala> val indicesMod = modeleMod.transform(vecteursGroupesModDF)
scala> indicesMod.select("prediction")
                 .map(p => p.toString.filter(c => c != '[' & c != ']'))
                 .write.text("data/indicesMod")

Renommez les fichiers de sortie ~/tpkmeans/data/vecteursGroupesMod/part-00000-... en ~/tpkmeans/data/vecteursGroupesMod/vecteurs et respectivement ~/tpkmeans/data/indicesMod/part-00000-... en ~/tpkmeans/data/indicesMod/idGroupes.

$ cd ~/tpkmeans/data
$ paste --delimiters="," vecteursGroupesMod/vecteurs indicesMod/idGroupes > donneesGnuplotMod.txt

Il faudra utiliser ensuite Gnuplot sur les données "data/donneesGnuplot.txt".

On constate que les regroupements ne sont plus les mêmes (des exceptions sont néanmoins possibles), car la distance entre les données est modifiée par la « dilatation » importante d’une des variables.

Question :

Réalisez la classification des données Spambase Data Set issues de l’archive de l’UCI. Comment pré-traiter les données et pourquoi ? Combien de groupes rechercher ? Visualisez ensuite leurs projections sur des groupes de 3 variables.

Correction :

Les variables ne sont pas de variances comparables ; sans normalisation, les variables de variance très élevée auront un impact déterminant sur les résultats de la classification automatique. Il est donc envisageable de normaliser d’abord les variables.

Les données correspondent à 2 classes, « spam » et « non spam ». Il est envisageable de faire d’abord une classification automatique en 2 groupes. Il est ensuite possible de rechercher plus de 2 groupes car dans chaque classe les données peuvent se regrouper dans plusieurs classes.

La visualisation des projections sur des groupes de 3 variables peut être peu informative car au départ il y a 57 variables.

Construction de pipeline simple

Afin de normaliser les données Spambase Data Set et d’appliquer ensuite la classification automatique, construisons un pipeline qui regroupe ces deux opérations :

scala> import org.apache.spark.ml.{Pipeline, PipelineModel}
scala> import org.apache.spark.ml.feature.StandardScaler
scala> import org.apache.spark.ml.clustering.KMeans
scala> import org.apache.spark.ml.feature.VectorAssembler

// Construction de DataFrame avec les données de Spambase
scala> def genSpamFields(from: Int, to: Int) = for (i <- from until to)
                         yield StructField("val"+i.toString, DoubleType, true)
scala> val spamFields = genSpamFields(0, 57)
scala> val spamSchema = StructType(spamFields).add("label", DoubleType, true)
scala> val spamDF = spark.read.format("csv").schema(spamSchema).load("../tpacp/data/spambase.data")
scala> val colsEntree = {for (i <- 0 until 57) yield "val"+i.toString}.toArray
scala> val assembleur = new VectorAssembler().setInputCols(colsEntree).setOutputCol("features")
scala> val spamDFA = assembleur.transform(spamDF).cache()
// Partitionnement en données de train (80%) et données de test (20%)
scala> val spamSplits = spamDFA.randomSplit(Array(0.8, 0.2))

// Construction d'un pipeline
scala> val scaler = new StandardScaler().setInputCol("features")
                                        .setOutputCol("scaledFeatures")
                                        .setWithStd(true)
                                        .setWithMean(true)
scala> val kmeansNS = new KMeans().setFeaturesCol(scaler.getOutputCol)
                                  .setPredictionCol("predictionNS")
                                  .setK(5)
                                  .setMaxIter(200)
                                  .setSeed(1L)
scala> val pipeline = new Pipeline().setStages(Array(scaler, kmeansNS))

// Estimation des modèles du pipeline (sur données de train):
//   le premier modèle (normalisation) est estimé en premier,
//   ensuite il est utilisé comme transformer pour modifier les données
//   et c'est le second modèle qui est estimé (clustering)
//
scala> val modeleKMNS = pipeline.fit(spamSplits(0))

// Application du modèle pour prédire les groupes des données de test
//
scala> val indicesSpamTest = modeleKMNS.transform(spamSplits(1))
                                       .select("scaledFeatures","predictionNS")

scala> indicesSpamTest.select("scaledFeatures")
                    .map(v => v.toString.filter(c => c != '[' & c != ']'))
                    .write.text("data/vecteursTestSpam")
scala> indicesSpamTest.select("predictionNS")
                      .map(p => p.toString.filter(c => c != '[' & c != ']'))
                      .write.text("data/indicesTestSpam")

// Un pipeline peut être sauvegardé (avant estimation)
scala> pipeline.write.overwrite().save("spark-pipeline-normEtKMeans")

// Un modèle issu d'un pipeline (après estimation) peut être sauvegardé
scala> modeleKMNS.write.overwrite().save("spark-modele-clustering-spam")

// Un modèle sauvegardé peut être chargé pour être employé
scala> val memeModele = PipelineModel.load("spark-modele-clustering-spam")

Renommer les fichiers de sortie ~/tpkmeans/data/vecteursTestSpam/part-00000-... en ~/tpkmeans/data/vecteursTestSpam/vecteurs et respectivement ~/tpkmeans/data/indicesTestSpam/part-00000-... en ~/tpkmeans/data/indicesTestSpam/idGroupes. Regrouper ensuite les deux pour l’utilisation de gnuplot :

$ cd ~/tpkmeans/data
$ paste --delimiters="," vecteursTestSpam/vecteurs indicesTestSpam/idGroupes > donneesGnuplot.txt

Et enfin visualiser avec gnuplot :

$ cd ~/tpkmeans
$ gnuplot
gnuplot> set datafile separator ','
gnuplot> set palette defined ( 0 "red", 1 "orange", 2 "brown", 3 "green", 4 "blue" )
gnuplot> splot "data/donneesGnuplot.txt" using 1:2:3:58 with points lc palette

Cette visualisation est faite sur les trois premières dimensions. Quelles que soient les trois dimensions choisies (parmi les 57 de départ) pour visualiser les groupes, elles seront probablement peu représentatives de cette séparation en groupes. Quelle méthode employer afin d’améliorer la visualisation ?