Travaux pratiques - Classification automatique avec k-means¶
(une variante de ce TP utilisant le langage Scala)
Références externes utiles :
L’objectif de cette séance de TP est de présenter l’utilisation dans Spark de la méthode k-means de classification automatique (avec une initialisation par k-means||), ainsi que la définition et l’utilisation de pipelines (enchaînements de traitements successifs) dans Spark.
Nous travaillerons lors de cette séance de travaux pratiques d’abord sur des données générées. Vous aurez à traiter ensuite les données Spambase Data Set issues de l’archive de l’UCI. Nous vous incitons à regarder sur le site de l’UCI des explications plus détaillées concernant ces données, notamment la signification des variables et les classes auxquelles appartiennent les données.
Les données ont été générées en amont, en utilisant la fonction KMeansDataGenerator.generateKMeansRDD
de l’API Spark en langage Scala car aucun équivalent direct n’est disponible dans l’API Spark en Python. Cette fonction choisit d’abord k centres de groupes à partir d’une loi normale \(d\)-dimensionnelle, d’écart-type \(r\) et ensuite crée autour de chaque centre un groupe à partir d’une loi \(d\)-dimensionnelle d’écart-type 1. Les paramètres d’appel sont :
sc
: leSparkContext
employé (celui par défaut pourpyspark
ou celui créé explicitement pour Jupyter) ;numPoints
: le nombre total de données générées ;k
: le nombre de centres (donc de groupes dans les données générées) ;d
: la dimension des données ;r
: écart-type pour la loi normale qui génère les centres ;numPartitions
: nombre de partitions pour le RDD généré (2 par défaut, ici nous avons utilis& 1 seule).
(se référer à la documentation pour plus de détails)
Pour générer 1000 données bidimensionnelles dans 5 groupes nous avons employé, avec spark-shell
(en langage Scala) le programme suivant (ne cherchez pas à l’exécuter dans pyspark
ou Jupyter en Python, c’est un programme en Scala) :
import spark.implicits._
import org.apache.spark.mllib.util.KMeansDataGenerator
// Générer les données
val donneesGenereesDF = KMeansDataGenerator.generateKMeansRDD(sc, 1000, 5, 2, 5, 1)
.map(l => (l(0), l(1))).toDF("x","y")
donneesGenereesDF.printSchema()
donneesGenereesDF.show(2)
donneesGenereesDF.coalesce(1).write.format("com.databricks.spark.csv")
.option("header",true).save("data/1000donneesKmeans2d5centres.csv")
Les données obtenues sont bien des vecteurs à deux coordonnées.
Pour préparer les répertoires et télécharger ces données générées en amont, ouvrez une fenêtre terminal et entrez les commandes suivantes :
%%bash
mkdir -p tpkmeans/data
cd tpkmeans/data
wget http://cedric.cnam.fr/vertigo/Cours/RCP216/docs/1000donneesKmeans2d5centres.csv
cd ..
pyspark
Sous Windows, créez le répertoire tpkmeans
et le sous-répertoire data
, téléchargez le fichier de données dans le sous-répertoire data
, ouvrez une fenêtre invite de commandes, allez dans le répertoire tpkmeans
que vous avez créé et lancez pyspark
.
Lecture des données générées dans un DataFrame :
# Lecture des données et création du Dataframe
vecteursGroupesDF = spark.read.format("csv").option("header", True) \
.option("inferSchema", True) \
.load("tpkmeans/data/1000donneesKmeans2d5centres.csv") \
.cache()
print(vecteursGroupesDF.dtypes)
vecteursGroupesDF.show(5) # vérification
Nous pouvons visualiser les données bidimensionnelles :
import pandas as pd
import matplotlib.pyplot as plt
donneesAafficher = vecteursGroupesDF.toPandas()
donneesAafficher.plot.scatter('x', 'y')
plt.show()
Nous pouvons vérifier que les données ont bien été générées aléatoirement par cinq lois modérément bien séparées les unes des autres.
K-means avec initialisation par K-means||¶
La MLlib de Spark propose une implémentation de l’algorithme de classification automatique k-means.
L’initialisation des centres est réalisée par l’algorithme parallèle k-means||, vu aussi en cours, avec 5 itérations (initSteps: 5
, peut être modifié avec KMeans.setInitSteps
).
La classification est obtenue en appelant KMeans.fit
. Différents paramètres peuvent être définis avec les méthodes suivantes (à appeler pour l’instance de KMeans
créée) :
setFeaturesCol(value: String)
: nom de la colonne qui contient les données à classifier, sous forme de vecteurs (par défaut"features"
) ;setK(value: Int)
: nombre de groupes ou clusters à obtenir (par défaut 2) ;setMaxIter(value: Int)
: nombre maximal d’itérations ; arrêt de l’exécution lorsque les centres sont stabilisés (paramètretol
, voirsetTol
ci-dessous) oumaxIter
est atteint ;setPredictionCol(value: String)
: nom de la colonne qui contient les « prédictions », c’est à dire le numéro de groupe pour chaque observation (par défaut"prediction"
) ;setSeed(value: long)
: valeur d’initialisation du générateur de nombres aléatoires (la préciser permet de reproduire les résultats ultérieurement) ;setTol(value: Double)
: définition de la valeur de la tolérance (tol
) pour l’évaluation de la convergence de l’algorithme ;setInitMode(value: String)
: méthode d’initialisation, soit"random"
, soit"k-means||"
(par défaut"k-means||"
) ;setInitSteps(value: Int)
: nombre de pas pour l’initialisation park-means||
(2 par défaut).
Se référer à la documentation pour plus de détails.
Voici un exemple de code Python pour Spark permettant de réaliser la classification automatique (clustering) avec KMeans. Commençons par assembler les colonnes individuelles :
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
# Construction du VectorAssembler
assembleur = VectorAssembler().setInputCols(["x","y"]).setOutputCol("features")
# Construction du Dataframe spamDFA en appliquant le VectorAssembler
vecteursGroupesDFA = assembleur.transform(vecteursGroupesDF)
vecteursGroupesDFA.printSchema()
vecteursGroupesDFA.select("features").show(3)
Nous pouvons maintenant appliquer k-means à notre DataFrame :
from pyspark.ml.clustering import KMeans
# Appliquer k-means
kmeans = KMeans().setK(5).setMaxIter(200).setSeed(100)
modele = kmeans.fit(vecteursGroupesDFA)
Pour obtenir les indices des groupes pour chaque observation, il suffit d’appliquer la méthode transform
au DataFrame souhaité :
# Trouver l'indice de groupe pour chaque donnée
resultats = modele.transform(vecteursGroupesDFA)
resultats.show(5)
Le DataFrame resultats
contient alors une colonne prediction
qui, pour chaque observation du jeu de données initial, correspond au numéro du groupe (cluster) dans lequel l’algorithme des k-moyennes l’a affectée.
Nous pouvons évaluer la qualité de cette classification à travers le coefficient de silhouette (entre -1 et 1, d’autant plus proche de 1 que les données d’un même groupe sont proches entre elles et éloignées des données des autres groupes) et afficher les coordonnées des centres des groupes trouvés. Noter que la somme des inerties intra-groupes obtenue à la fin de la classification automatique n’est plus disponible à partir de Spark 3.0.
# Évaluer la classification par le coefficient de silhouette
from pyspark.ml.evaluation import ClusteringEvaluator
evaluateur = ClusteringEvaluator()
silhouette = evaluateur.evaluate(resultats)
print("Silhouette : " + str(silhouette))
# Afficher les centres des groupes
centres = modele.clusterCenters()
print("Centres : ")
for centre in modele.clusterCenters():
print(centre)
Le DataFrame resultat
contient alors une colonne prediction
qui, pour chaque observation du jeu de données initial, correspond au numéro du groupe (cluster) dans lequel l’algorithme des k-moyennes l’a affectée.
La visualisation se fait en indiquant que la couleur est donnée par la valeur dans la colonne prediction
:
import matplotlib.pyplot as plt
donneesAafficher = resultats.select("x","y","prediction").toPandas()
donneesAafficher.plot.scatter('x', 'y', c='prediction', cmap='coolwarm')
plt.show()
Questions¶
Question :
Réalisez une nouvelle fois la classification des mêmes données avec k-means en 5 groupes mais une valeur différente dans .setSeed()
. Visualisez à nouveau les résultats (mais en gardant les graphes initiaux ouverts pour pouvoir comparer). Que constatez-vous ?
Question :
Réalisez deux fois la classification des mêmes données avec k-means en 5 groupes mais avec une initialisation random
plutôt que k-means||
et des valeurs différentes dans .setSeed()
. Que constatez-vous ?
Question :
Réalisez la classification des mêmes données avec k-means et une initialisation k-means||
en 4 groupes et ensuite en 6 groupes. Visualisez à chaque fois les résultats. Que constatez-vous ?
Question (optionnelle) :
Multipliez par 5 les données sur la seconde des dimensions initiales en vous servant de ElementwiseProduct. Réalisez la classification des nouvelles données avec k-means (et initialisation par k-means||
) en 5 groupes. Visualisez les résultats. Que constatez-vous ?
Question (optionnelle) :
Réalisez la classification des données Spambase Data Set issues de l’archive de l’UCI. Comment pré-traiter les données et pourquoi ? Combien de groupes rechercher ? Visualisez ensuite leurs projections sur des groupes de 3 variables.
Les flux de travail (ML pipelines)¶
Spark permet de définir des flux de travail (ou workflows, ou ML pipelines) qui enchaînent différentes étapes de traitement des données se trouvant dans des DataFrames. Les composants des pipelines sont les transformers (transformateurs) et les estimators (estimateurs).
Un transformer est un algorithme qui transforme un DataFrame en un autre DataFrame, en général par ajout d’une ou plusieurs colonnes. La transformation peut être, par exemple, le centrage et la réduction des variables numériques (transformer chaque variable pour qu’elle soit de moyenne nulle et écart-type égal à 1) qui décrivent les observations ou la transformation de textes en vecteurs. Chaque transformer possède une méthode .transform()
qui, en général, crée une ou plusieurs colonnes et les ajoute au DataFrame reçu en entrée.
Un estimator est un algorithme qui permet de construire un modèle à partir de données d’un DataFrame. Une fois construit, ce modèle sera un transformer. Chaque estimator possède une méthode .fit()
. Par exemple, pour centrer et réduire les variables numériques décrivant des observations il est nécessaire d’utiliser un estimator qui détermine les moyennes et les écart-types à partir des données ; ces moyennes et écart-types constituent le modèle qui, appliqué comme un transformer, permet ensuite d’obtenir le centratge et la réduction pour les données sur lesquelles il a été obtenu (avec .fit()
) mais aussi sur d’autres données (par exemple, de nouvelles observations concernant le même problème). Nous avons déjà vu la construction d’un modèle avec .fit()
et son utilisation ultérieure avec .transform()
dans le TP précédent lorsque nous avons centré et réduit les variables avant application de l’ACP. Dans cette séance de TP nous examinerons la construction d’un modèle descriptif par classification automatique, réalisée par un estimator. Le modèle résultant est un transformer qui, appliqué aux données d’un DataFrame, fournit pour chaque observation le numéro du centre de groupe le plus proche (donc le numéro du groupe auquel cette observation est « affectée »).
Les transformers et les estimators emploient une interface commune parameter pour spécifier les paramètres utilisés.
Un pipeline permet d’enchaîner des composants qui, au départ, peuvent être des estimators et, après estimation, sont employés comme des transformers. Le pipeline dispose ainsi
d’une méthode
.fit()
qui permet d’enchaîner les opérations d’estimation pour tous les estimators composant le pipeline,ainsi que d’une méthode
.transform()
qui enchaîne les traitements des données par les différents modèles estimés, devenus des transformers.
Dans la documentation de Spark, ce processus est illustré dans cet exemple. Nous aborderons un autre exemple de pipeline dans la suite de cette séance.
Construction de pipeline simple¶
Afin de centrer et réduire les données Spambase Data Set et d’appliquer ensuite la classification automatique, construisons un pipeline qui regroupe ces deux opérations :
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
from pyspark.sql.types import DoubleType, StructType, StructField
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.clustering import KMeans
# Génération des StructField correspondant aux 57 premières colonnes
spamFields = [StructField("val"+str(i), DoubleType(), True) for i in range(0, 57)]
# Construction du schéma complet du Dataframe (incluant la dernière colonne)
spamSchema = StructType(spamFields).add("label", DoubleType(), True)
# Lecture des données et création du Dataframe
# corriger si nécessaire la localisation de spambase.data
spamDF = spark.read.format("csv").schema(spamSchema).load("tpacp/data/spambase.data").cache()
# Construction du VectorAssembler
colsEntree = ["val"+str(i) for i in range(0, 57)]
assembleur = VectorAssembler().setInputCols(colsEntree).setOutputCol("features")
# Construction du Dataframe spamDFA en appliquant le VectorAssembler
spamDFA = assembleur.transform(spamDF).cache()
# Partitionnement en données d'apprentissage (80%) et données de test (20%), avec seed=1
spamSplits = spamDFA.randomSplit([0.8, 0.2], 1)
print(spamSplits[0].count())
print(spamSplits[1].count())
# Construction d'un pipeline
scaler = StandardScaler().setInputCol("features") \
.setOutputCol("scaledFeatures") \
.setWithStd(True) \
.setWithMean(True)
kmeansNS = KMeans().setFeaturesCol(scaler.getOutputCol()) \
.setPredictionCol("predictionNS") \
.setK(5) \
.setMaxIter(200) \
.setSeed(1)
pipeline = Pipeline().setStages([scaler, kmeansNS])
L’objet pipeline
contient désormais deux opérations : la normalisation (scaler
) et la classification (kmeansNS
). Ces deux opérations (stages) seront appliquées successivement sur les données passées aux méthodes fit
et transform
du pipeline. Par exemple, nous pouvons réaliser l’apprentissage du modèle KMeans avec normalisation de la façon suivante :
# Estimation des modèles du pipeline (sur données *train*):
# le premier modèle (centrage et réduction) est estimé en premier,
# ensuite il est utilisé comme transformer pour modifier les données
# et c'est le second modèle qui est estimé (clustering)
modeleKMNS = pipeline.fit(spamSplits[0])
Et l’ensemble peut s’appliquer en inférence sur de nouvelles données comme suit :
# Application du modèle pour prédire les groupes des données de test
indicesSpamTest = modeleKMNS.transform(spamSplits[1]).select("scaledFeatures","predictionNS")
indicesSpamTest.show(5)
Un pipeline (ou n’importe lequel de ses sous-modèles) peut être sauvegardé pour être réutilisé plus tard :
# Un pipeline peut être sauvegardé (avant estimation)
pipeline.write().overwrite().save("tpkmeans/spark-pipeline-normEtKMeans")
# Un modèle issu d'un pipeline (après estimation) peut être sauvegardé
modeleKMNS.write().overwrite().save("tpkmeans/spark-modele-clustering-spam")
# Un modèle sauvegardé peut être chargé pour être employé ultérieurement
memeModele = PipelineModel.load("tpkmeans/spark-modele-clustering-spam")
Regardez la documentation pour la méthode de normalisation robuste des variables.
Question
Réaliser la visualisation de la classification automatique sur le jeu de données Spambase. Cette visualisation est faite sur les deux premières dimensions. Quelles que soient les deux dimensions choisies (parmi les 57 de départ) pour visualiser les groupes, elles seront probablement peu représentatives de cette séparation en groupes. Quelle méthode employer afin d’améliorer la visualisation ?