Travaux pratiques - Classification automatique avec k-means

(la version précédente de cette séance, utilisant l’API RDD)

Références externes utiles :

L’objectif de cette séance de TP est de présenter l’utilisation dans Spark de la méthode k-means de classification automatique (avec une initialisation par k-means||), ainsi que la définition et l’utilisation de pipelines (enchaînements de traitements successifs) dans Spark.

Nous travaillerons lors de cette séance de travaux pratiques d’abord sur des données générées pendant la séance. Vous aurez à traiter ensuite les données Spambase Data Set issues de l’archive de l’UCI. Nous vous incitons à regarder sur le site de l’UCI des explications plus détaillées concernant ces données, notamment la signification des variables et les classes auxquelles appartiennent les données.

Nous allons pour cette séance utiliser Vegas-viz comme outil de visualisation. Avec la gestion dynamique des dépendances pour Jupyter/Toree, il suffit d’utiliser la commande magique %AddDeps pour importer les paquets adéquats :

// Vegas v0.3.11 pour Scala 2.11
// L'option --transitive permet de télécharger également les dépendances de Vegas

%AddDeps org.vegas-viz vegas_2.11 0.3.11 --transitive
%AddDeps org.vegas-viz vegas-spark_2.11 0.3.11 --transitive

Génération des données

Nous nous servirons de KMeansDataGenerator.generateKMeansRDD pour générer un jeu de données synthétique sur lequel nous appliquerons le clustering. Ces données sont produites d’abord sous la forme d’un RDD qui est ensuite transformé en DataFrame. Cette méthode choisit d’abord k centres de groupes à partir d’une loi normale \(d\)-dimensionnelle, d’écart-type \(r\) et ensuite crée autour de chaque centre un groupe à partir d’une loi normale \(d\)-dimensionnelle d’écart-type 1. Les paramètres d’appel sont :

  • sc : le SparkContext employé (ici, celui par défaut de spark-shell) ;

  • numPoints : le nombre total de données générées ;

  • k : le nombre de centres (donc de groupes dans les données générées) ;

  • d : la dimension des données ;

  • r : écart-type pour la loi normale qui génère les centres ;

  • numPartitions : nombre de partitions pour le RDD généré (2 par défaut, ici nous utiliserons 1 seule).

(se référer à la documentation pour plus de détails)

Nous générerons 1000 données bidimensionnelles dans 5 groupes en utilisant :

import org.apache.spark.sql.Row
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.mllib.util.KMeansDataGenerator

// Générer les données
val donneesGenerees = KMeansDataGenerator.generateKMeansRDD(sc, 1000, 5, 2, 5, 1)
                                                .map(l => Row(l(0), l(1), Vectors.dense(l)))
donneesGenerees.take(2)

Les données obtenues sont bien des vecteurs à deux coordonnées. Nous pouvons maintenant transformer ce RDD en DataFrame contenant une seule colonne features de type Vector :

// Construction d'un DataFrame à partir du RDD
import org.apache.spark.sql.types._
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType

val schemaVecteurs = StructType(Seq(StructField("x", DoubleType, true), StructField("y", DoubleType, true), StructField("features", VectorType, true)))
val vecteursGroupesDF = spark.createDataFrame(donneesGenerees, schemaVecteurs).cache()
vecteursGroupesDF.show(false)

Nous pouvons visualiser les points comme dans les TP précédents en s’appuyant sur Vegas :

// Import des bibliothèques de Vegas
import vegas._
implicit val render = vegas.render.ShowHTML(kernel.display.content("text/html", _))

import vegas.data.External._
import vegas.sparkExt._

// Construction du nuage de points
val points = vecteursGroupesDF
Vegas("Données initiales").withDataFrame(points).mark(Point).encodeX("x", Quant).encodeY("y", Quant).show

Nous pouvons vérifier que les données ont bien été générées aléatoirement par cinq gaussiennes modérément bien séparées les unes des autres.

K-means avec initialisation par K-means||

La MLlib de Spark propose une implémentation de l’algorithme de classification automatique k-means. L’initialisation des centres est réalisée par l’algorithme parallèle k-means||, vu aussi en cours, avec 5 itérations (initSteps: 5, peut être modifié avec KMeans.setInitSteps).

La classification est obtenue en appelant KMeans.fit. Différents paramètres peuvent être définis avec les méthodes suivantes (à appeler pour l’instance de KMeans créée) :

  • setFeaturesCol(value: String) : nom de la colonne qui contient les données à classifier, sous forme de vecteurs (par défaut "features") ;

  • setK(value: Int) : nombre de groupes ou clusters à obtenir (par défaut 2) ;

  • setMaxIter(value: Int) : nombre maximal d’itérations ; arrêt de l’exécution lorsque les centres sont stabilisés (paramètre tol, voir setTol ci-dessous) ou maxIter est atteint ;

  • setPredictionCol(value: String) : nom de la colonne qui contient les « prédictions », c’est à dire le numéro de groupe pour chaque observation (par défaut "prediction") ;

  • setSeed(value: long) : valeur d’initialisation du générateur de nombres aléatoires (la préciser permet de reproduire les résultats ultérieurement) ;

  • setTol(value: Double) : définition de la valeur de la tolérance (tol) pour l’évaluation de la convergence de l’algorithme ;

  • setInitMode(value: String) : méthode d’initialisation, soit "random", soit "k-means||" (par défaut "k-means||") ;

  • setInitSteps(value: Int) : nombre de pas pour l’initialisation par k-means|| (2 par défaut).

Se référer à la documentation pour plus de détails.

Voici un exemple de code Scala pour Spark permettant de réaliser la classification automatique (clustering) avec KMeans. Commençons par appliquer k-means à notre DataFrame :

import org.apache.spark.ml.clustering.KMeans

// Appliquer k-means
val kmeans = new KMeans().setK(5).setMaxIter(200).setSeed(1L)
val modele = kmeans.fit(vecteursGroupesDF)

Une fois ceci fait, nous pouvons calculer l’inertie globale de cette classification (cela permet de comparer deux classifications différentes pour choisir la meilleure de \(k\), par exemple) et afficher les coordonnées des centres des groupes trouvés.

// Évaluer la classification par la somme des inerties intra-classe
val wsse = modele.computeCost(vecteursGroupesDF)
// Afficher les centres des groupes
modele.clusterCenters.foreach(println)

Pour obtenir les indices des groupes et réaliser la classification à proprement parler, il suffit d’appliquer la méthode transform au DataFrame souhaité :

// Trouver l'indice de groupe pour chaque donnée
val resultat = modele.transform(vecteursGroupesDF)
resultat.show(5)

Le DataFrame resultat contient alors une colonne prediction qui, pour chaque observation du jeu de données initial, correspond au numéro du groupe (cluster) dans lequel l’algorithme des k-moyennes l’a affectée.

La visualisation avec Vegas se fait en ajoutant l’option encodeColor en utilisant la valeur de la colonne prediction :

val points = resultat
Vegas("K-Means").withDataFrame(points).mark(Point).encodeX("x", Quant).encodeY("y", Quant).encodeColor("prediction", Nom).show

Questions

Question :

Réalisez une nouvelle fois la classification des mêmes données avec k-means en 5 groupes mais une valeur différente dans .setSeed(). Visualisez à nouveau les résultats (mais en gardant les graphes initiaux ouverts pour pouvoir comparer). Que constatez-vous ?

Correction :

En général les indices des groupes changent : un même groupe n’a pas la même couleur dans les deux fenêtres de visualisation. En revanche, les regroupements obtenus sont souvent les mêmes entre les deux exécutions de k-means.

Question :

Réalisez deux fois la classification des mêmes données avec k-means en 5 groupes mais avec une initialisation random plutôt que k-means|| et des valeurs différentes dans .setSeed().

Que constatez-vous ?

Correction :

val kmeansR = new KMeans().setK(5).setMaxIter(200).setInitMode("random").setSeed(1L)
val modeleR = kmeansR.fit(vecteursGroupesDF)
val wsseR = modeleR.computeCost(vecteursGroupesDF)

Non seulement les indices des groupes changent, mais en plus les regroupements obtenus peuvent être différents entre les deux exécutions de k-means (même si cela ne se produit pas systématiquement, surtout si les groupes générés au départ sont bien séparés entre eux). Les résultats dépendent plus de l’initialisation lorsque celle-ci est aléatoire plutôt que réalisée avec k-means||.

Question :

Réalisez la classification des mêmes données avec k-means et une initialisation k-means|| en 4 groupes et ensuite en 6 groupes. Visualisez à chaque fois les résultats. Que constatez-vous ?

Correction :

Avec 4 groupes, les groupes précédents les plus proches sont fusionnés. Avec 6 groupes, un des groupes précédents est divisé en deux. De plus, en exécutant la classification plusieurs fois, on peut constater une relative dépendance des résultats de l’intialisation malgré l’utilisation de k-means||.

Question (optionnelle) :

Multipliez par 3 les données sur une des dimensions initiales en vous servant de ElementwiseProduct. Réalisez la classification des nouvelles données avec k-means (et initialisation par k-means||) en 5 groupes. Visualisez les résultats. Que constatez-vous ?

Correction :

Nous nous servons de ElementwiseProduct :

import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.feature.ElementwiseProduct

// On contracte le nuage d'observations initial par 5 selon l'axe x
val amplifications = Vectors.dense(0.2, 1.0)
val amplifieur = new ElementwiseProduct().setScalingVec(amplifications).setInputCol("features").setOutputCol("featuresMod")
val vecteursGroupesModDF = amplifieur.transform(vecteursGroupesDF)

val kmeansMod = new KMeans().setFeaturesCol("featuresMod").setK(5).setMaxIter(200).setSeed(1L)
val modeleMod = kmeansMod.fit(vecteursGroupesModDF)
val wsseMod = modeleMod.computeCost(vecteursGroupesModDF)
val indicesMod = modeleMod.transform(vecteursGroupesModDF)

On constate que les regroupements ne sont plus les mêmes (des exceptions sont néanmoins possibles), car la distance entre les données est modifiée par la « dilatation » importante d’une des variables.

Question (optionnelle) :

Réalisez la classification des données Spambase Data Set issues de l’archive de l’UCI. Comment pré-traiter les données et pourquoi ? Combien de groupes rechercher ? Visualisez ensuite leurs projections sur des groupes de 3 variables.

Correction :

Les variables ne sont pas de variances comparables ; sans centrage et réduction, les variables de variance très élevée auront un impact déterminant sur les résultats de la classification automatique. Il est donc envisageable de centrer et réduire d’abord les variables.

Les données correspondent à 2 classes, « spam » et « non spam », il donc est envisageable de faire d’abord une classification automatique en 2 groupes. Il est ensuite possible de rechercher plus de 2 groupes car dans chaque classe les données peuvent former plusieurs groupes distincts.

La visualisation des projections sur des groupes de 3 variables peut être peu informative car au départ il y a 57 variables. Le TP optionnel sur l’analyse factorielle discriminante montre comment il est possible d’obtenir une visualisation en 3D dans laquelle la séparation entre groupes est optimisée.

Les flux de travail (ML pipelines)

Spark permet de définir des flux de travail (ou workflows, ou ML pipelines) qui enchaînent différentes étapes de traitement des données se trouvant dans des DataFrames. Les composants des pipelines sont les transformers (transformateurs) et les estimators (estimateurs).

Un transformer est un algorithme qui transforme un DataFrame en un autre DataFrame, en général par ajout d’une ou plusieurs colonnes. La transformation peut être, par exemple, le centrage et la réduction des variables numériques (transformer chaque variable pour qu’elle soit de moyenne nulle et écart-type égal à 1) qui décrivent les observations ou la transformation de textes en vecteurs. Chaque transformer possède une méthode .transform() qui, en général, crée une ou plusieurs colonnes et les ajoute au DataFrame reçu en entrée.

Un estimator est un algorithme qui permet de construire un modèle à partir de données d’un DataFrame. Une fois construit, ce modèle sera un transformer. Chaque estimator possède une méthode .fit(). Par exemple, pour centrer et réduire les variables numériques décrivant des observations il est nécessaire d’utiliser un estimator qui détermine les moyennes et les écart-types à partir des données ; ces moyennes et écart-types constituent le modèle qui, appliqué comme un transformer, permet ensuite d’obtenir le centratge et la réduction pour les données sur lesquelles il a été obtenu (avec .fit()) mais aussi sur d’autres données (par exemple, de nouvelles observations concernant le même problème). Nous avons déjà vu la construction d’un modèle avec .fit() et son utilisation ultérieure avec .transform() dans le TP précédent lorsque nous avons centré et réduit les variables avant application de l’ACP. Dans cette séance de TP nous examinerons la construction d’un modèle descriptif par classification automatique, réalisée par un estimator. Le modèle résultant est un transformer qui, appliqué aux données d’un DataFrame, fournit pour chaque observation le numéro du centre de groupe le plus proche (donc le numéro du groupe auquel cette observation est « affectée »).

Les transformers et les estimators emploient une interface commune parameter pour spécifier les paramètres utilisés.

Un pipeline permet d’enchaîner des composants qui, au départ, peuvent être des estimators et, après estimation, sont employés comme des transformers. Le pipeline dispose ainsi

  1. d’une méthode .fit() qui permet d’enchaîner les opérations d’estimation pour tous les estimators composant le pipeline,

  2. ainsi que d’une méthode .transform() qui enchaîne les traitements des données par les différents modèles estimés, devenus des transformers.

Dans la documentation de Spark, ce processus est illustré dans cet exemple. Nous aborderons un autre exemple de pipeline dans la suite de cette séance.

Construction de pipeline simple

Afin de centrer et réduire les données Spambase Data Set et d’appliquer ensuite la classification automatique, construisons un pipeline qui regroupe ces deux opérations :

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.feature.StandardScaler
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.feature.VectorAssembler

// Construction de DataFrame avec les données de Spambase
def genSpamFields(from: Int, to: Int) = for (i <- from until to)
                         yield StructField("val"+i.toString, DoubleType, true)
val spamFields = genSpamFields(0, 57)
val spamSchema = StructType(spamFields).add("label", DoubleType, true)
val spamDF = spark.read.format("csv").schema(spamSchema).load("data/spambase.data")
val colsEntree = {for (i <- 0 until 57) yield "val"+i.toString}.toArray
val assembleur = new VectorAssembler().setInputCols(colsEntree).setOutputCol("features")
val spamDFA = assembleur.transform(spamDF).cache()
// Partitionnement en données de train (80%) et données de test (20%)
val spamSplits = spamDFA.randomSplit(Array(0.8, 0.2))

// Construction d'un pipeline
val scaler = new StandardScaler().setInputCol("features")
                                        .setOutputCol("scaledFeatures")
                                        .setWithStd(true)
                                        .setWithMean(true)
val kmeansNS = new KMeans().setFeaturesCol(scaler.getOutputCol)
                                  .setPredictionCol("predictionNS")
                                  .setK(5)
                                  .setMaxIter(200)
                                  .setSeed(1L)
val pipeline = new Pipeline().setStages(Array(scaler, kmeansNS))

L’objet pipeline contient désormais deux opérations : la normalisation (scaler) et la classification (kmeansNS). Ces deux opérations (stages) seront appliquées successivement sur les données passées aux méthodes fit et transform du pipeline. Par exemple, nous pouvons réaliser l’apprentissage du modèle KMeans avec normalisation de la façon suivante :

// Estimation des modèles du pipeline (sur données *train*):
//   le premier modèle (centrage et réduction) est estimé en premier,
//   ensuite il est utilisé comme transformer pour modifier les données
//   et c'est le second modèle qui est estimé (clustering)
//
val modeleKMNS = pipeline.fit(spamSplits(0))

Et l’ensemble peut s’appliquer en inférence sur de nouvelles données comme suit :

// Application du modèle pour prédire les groupes des données de test
//
val indicesSpamTest = modeleKMNS.transform(spamSplits(1)).select("scaledFeatures","predictionNS")

Un pipeline (ou n’importe lequel de ses sous-modèles) peut être sauvegardé pour être réutilisé plus tard :

// Un pipeline peut être sauvegardé (avant estimation)
pipeline.write.overwrite().save("spark-pipeline-normEtKMeans")

// Un modèle issu d'un pipeline (après estimation) peut être sauvegardé
modeleKMNS.write.overwrite().save("spark-modele-clustering-spam")

// Un modèle sauvegardé peut être chargé pour être employé
val memeModele = PipelineModel.load("spark-modele-clustering-spam")

Question

Réaliser la visualisation avec Vegas de la classification automatique sur le jeu de données Spambase. Cette visualisation est faite sur les deux premières dimensions. Quelles que soient les deux dimensions choisies (parmi les 57 de départ) pour visualiser les groupes, elles seront probablement peu représentatives de cette séparation en groupes. Quelle méthode employer afin d’améliorer la visualisation ?