Travaux pratiques - Fouille de flux de données

Références externes utiles :

Cette séance portera sur la classification automatique de données vectorielles issues d’un flux de données, voir la documentation Spark Streaming k-means. Nous travaillerons sur des données générées en amont de la séance de travaux pratiques.

Dans la version 2.2.0 de Spark, l’API Dataset / Dataframe ne permet pas l’utilisation directe d’algorithmes de type Streaming K-means. Pour cette séance de TP nous nous servirons donc de l’API RDD.

Récupération des données

Ouvrez une fenêtre terminal (nous l’appellerons « fenêtre de contrôle » dans la suite) et entrez :

$ mkdir -p tpflux/data
$ cd tpflux/data
$ wget http://cedric.cnam.fr/~crucianm/src/full.zip
$ unzip full.zip

Si la commande spark-shell n’est pas trouvée, entrez d’abord export PATH="$PATH:/opt/spark/bin" et ensuite spark-shell.

La commande unzip full.zip crée un répertoire full dans data, dans lequel se trouvent 1000 fichiers de 100 données tridimensionnelles chacun, qui correspondent aux « tranches » du flux de données traitées par la classification automatique de flux (streaming k-means).

Pour votre information, ces 100.000 données tridimensionnelles dans 7 groupes ont été générées de façon similaire aux données employées dans la séance sur la classification automatique, en utilisant les commandes suivantes dans spark-shell lancé dans le répertoire ~/tpflux (attention, n’entrez pas ces commandes, vous venez de télécharger ces données) :

scala> import org.apache.spark.mllib.linalg.Vectors
scala> import org.apache.spark.mllib.util.KMeansDataGenerator

// Génération des données
scala> val donneesGenerees = KMeansDataGenerator.generateKMeansRDD(sc, 100000, 7, 3, 5, 1)
scala> val donnees = donneesGenerees.map(s => Vectors.dense(s)).cache()

// Enregistrement des données dans un fichier texte
scala> val donneesTxt = donnees.map(l => l.toString)
scala> donneesTxt.saveAsTextFile("data/donnees")

Toujours pour votre information, ces données ont ensuite été transformées : les lignes du fichier data/donnees/part-00000 ont été retriées dans un ordre aléatoire avec l’utilitaire linux shuf et ensuite découpées en groupes de 100 lignes dans des fichiers différents dans le répertoire full/ (attention, n’entrez pas ces commandes, vous avez déjà ces données) :

$ shuf data/donnees/part-00000 > alldatashuffle
$ cd data
$ mkdir full
$ split -l 100 -a 3 alldatashuffle full/

Préparation des scripts nécessaires

Pour expérimenter avec la classification automatique de données issues d’un flux, nous avons besoin d’un premier script linux qui prendra un par un les fichiers avec les « tranches » (100 vecteurs tridimensionnels) et les copiera dans un répertoire stream. Le traitement de flux de données de Spark prendra chaque nouveau fichier du répertoire stream, lui appliquera les mêmes opérations (correspondant à une classification automatique incrémentale, voir streaming k-means et générera des étiquettes de groupe (cluster) pour chaque donnée de chaque « tranche ». Afin de nous permettre de visualiser les résultats, le même script linux prendra ces étiquettes et les ajoutera au fichier de données de la « tranche » correspondante, dans un format adapté à la visualisation avec gnuplot.

Pour écrire ce script linux, ouvrez un éditeur de texte, copiez dans cet éditeur les lignes suivantes et enregistrez le fichier sous le nom prepfiles.sh dans le répertoire ~/tpflux/data :

dir=$1      # 1er paramètre : répertoire source
newdir=$2   # 2ème paramètre : répertoire d'entrée du flux
newdir2=$3  # 3ème paramètre : répertoire de préparation pour gnuplot
for file in $dir/*  # pour chaque fichier source
do
    filename=$(basename $file)     # extraire le nom du fichier du nom complet
    cp $file $newdir/$filename     # copier le fichier dans le répertoire flux
    # copier le fichier, après avoir supprimé le premier et dernier caractère de chaque ligne,
    #  dans le répertoire pour gnuplot
    cat $file | sed 's/.//;s/.$//' > $newdir2/$filename
    sleep 10      # attendre 10 secondes
    # préparer les données pour gnuplot
    paste --delimiters="," $newdir2/$filename indices/part-00000 >> donneesGnuplot.txt
    rm -rf indices   # préparer une nouvelle écriture de la sortie du flux
done

Il est ensuite nécessaire de rendre ce fichier prepfiles.sh exécutable. Pour cela, dans la fenêtre terminal déjà utilisée, entrez :

$ chmod a+x prepfiles.sh

Les résultats seront affichés avec gnuplot ; pour que l’affichage puisse être mis à jour après le traitement de chaque nouvelle « tranche » du flux, nous utiliserons gnuplot avec un fichier script show.gp dans lequel nous incluerons l’instruction reread. Pour écrire ce script gnuplot, ouvrez un éditeur de texte, copiez dans cet éditeur les lignes suivantes et enregistrez le fichier sous le nom show.gp dans le répertoire ~/tpflux/data :

set datafile separator ','
set palette defined ( 0 "red", 1 "orange", 2 "brown", 3 "green", 4 "blue", 5 "pink", 6 "magenta")
splot "donneesGnuplot.txt" using 1:2:3:4 with points lc palette
pause 10
reread

Vérifiez si gnuplot est installé sur l’ordinateur sur lequel vous travaillez : entrez gnuplot dans la fenêtre de contrôle, si le programme est installé alors vous aurez un prompt gnuplot>, quittez en entrant quit. Si gnuplot n’est pas installé il faudra l’installer, entrez sudo yum install gnuplot (ou la commande appropriée à votre distribution linux) dans la fenêtre de contrôle. Dans la salle de TP gnuplot est déjà installé.

Nous pouvons maintenant passer à la mise en œuvre de la classification automatique des données en flux.

Classification et visualisation des résultats

Regardez d’abord streaming k-means. Ensuite, positionnez-vous dans le répertoire ~/tpflux ; pour cela, ouvrez une nouvelle fenêtre terminal (nous l’appellerons « fenêtre Spark » dans la suite), entrez cd ~/tpflux), lancez spark-shell et entrez :

// Importation des librairies
scala> import org.apache.spark._
scala> import org.apache.spark.streaming._
scala> import org.apache.spark.mllib.linalg.Vectors
scala> import org.apache.spark.mllib.regression.LabeledPoint
scala> import org.apache.spark.mllib.clustering.StreamingKMeans

// Création du StreamingContext : le flux sera découpés en "tranches" de 10 secondes,
//   chaque tranche sera un RDD
scala> val ssc = new StreamingContext(sc, Seconds(10))

// Lecture du flux de données d'entree : les nouveaux fichiers déposés dans
//   "data/stream" seront rassemblés dans un RDD toutes les 10s et traités
scala> val trainingData = ssc.textFileStream("data/stream").map(Vectors.parse)

// Paramétrage de Streaming k-means
scala> val numDimensions = 3   // données 3D
scala> val numClusters = 7     // recherche de 7 groupes
scala> val model = new StreamingKMeans()  // création d'un modèle
// Initialisation des paramètres du modèle
scala> model.setK(numClusters)
scala> model.setDecayFactor(0.5)   // valeur de alpha
scala> model.setRandomCenters(numDimensions, 0.0)

// Indication du flux sur lequel le modèle est appris (les groupes sont trouvés)
scala> model.trainOn(trainingData)

// Indication du flux sur lequel les prédictions sont faites par le modèle
scala> model.predictOn(trainingData).foreachRDD(rdd => rdd.saveAsTextFile("data/indices"))

Pour démarrer le traitement, dans la fenêtre de contrôle entrez (démarrage de la « source » du flux) :

$ mkdir stream
$ mkdir points
$ ./prepfiles.sh full stream points

Ensuite (rapidement), dans la fenêtre Spark, entrez (démarrage du traitement du flux) :

scala> ssc.start()

Enfin, ouvrez une autre fenêtre terminal (nous l’appellerons fenêtre Gnuplot) et entrez (démarrage de la visualisation des résultats) :

$ cd tpflux/data
$ gnuplot show.gp

Regardez l’évolution du graphique affiché par gnuplot.

Pour arrêter le processus, entrez d’abord dans la fenêtre Spark (arrêt du StreamingContext) :

scala> ssc.stop(false,true)

Cela demande l’arrêt du traitement du flux, sans arrêt du SparkContext (valeur false pour le premier paramètre) et « avec grâce » (gracefully), c’est à dire en attendant que toutes les données reçues soient traitées (valeur true pour le second paramètre).

Ensuite entrez Ctrl-C (appuyez simultanément sur les touches ‘Ctrl’ et ‘c’ du clavier) dans la fenêtre de contrôle et enfin Ctrl-C dans la fenêtre Gnuplot. Si vous voulez conserver la visualisation courante, évitez d’arrêter gnuplot avec Ctrl-C, modifiez le nom du fichier à afficher (donneesGnuplot.txt) dans prepfiles.sh et dans show.gp, ouvrez une autre fenêtre terminal et lancez gnuplot dans cette autre fenêtre la prochaine fois.

Pour relancer tout le processus, il est nécessaire de faire d’abord un peu de nettoyage. Dans la fenêtre de contrôle, entrez :

$ rm -f stream/* points/* donneesGnuplot.txt
$ rm -rf indices

Il est ensuite nécessaire de recréer un StreamingContext et un modèle. Dans la fenêtre Spark, entrez :

scala> val ssc = new StreamingContext(sc, Seconds(10))

scala> val trainingData = ssc.textFileStream("data/stream").map(Vectors.parse)
scala> val model = new StreamingKMeans()  // création d'un modèle
scala> model.setK(numClusters)
scala> model.setDecayFactor(0.5)
scala> model.setRandomCenters(numDimensions, 0.0)
scala> model.trainOn(trainingData)
scala> model.predictOn(trainingData).foreachRDD(rdd => rdd.saveAsTextFile("data/indices"))

Enfin, relancez dans le même ordre qu’au début.

Questions

Question :

Quelle est l’utilité de setDecayFactor ? Modifiez sa valeur et regardez l’impact sur le résultat.

Correction :

Lors du traitement de « tranches » successives du flux, setDecayFactor permet de réduire l’impact des « tranches » précédentes sur la position des centres. La valeur de 1.0 est maximale (impact constant), la valeur de 0.0 est minimale (impact nul des tranches précédentes). Pour des valeurs faibles mais non nulles (par ex. 0.1) vous constaterez une certaine « instabilité » des résultats : les groupes (identifiés par une couleur) peuvent parfois changer d’une tranche à la suivante.

Question :

Qu’affiche vraiment gnuplot dans cet exemple ? Expliquez pourquoi l’évolution de l’affichage est celle que vous constatez.

Correction :

Dans le script prepfiles.sh, l’utilisation de >> donneesGnuplot.txt indique que les nouvelles données étiquettées sont ajoutées aux données affichées précédemment. Au démarrage de l’algorithme de classification automatique de flux, les centres sont aléatoires et donc les étiquettes de groupe affectées aux données ne correspondent pas aux groupes qui avaient été utilisés lors de la génération des données. Au fur et à mesure que de nouvelles données sont traitées et les centres évoluent, les groupes résultants se rapprochent des groupes utilisés pour la génération. Ces nouvelles données sont affichées, dans la vue 2D de l’affichage avec splot, au-dessus des anciennes étiquettes, les couleurs des groupes deviennent donc uniformes malgré la présence dans ces groupes d’anciennes données qui avaient une « mauvaise » étiquette de groupe (chaque étiquette de groupe est représentée par une couleur de la palette).