.. _chap-tpFouilleFlux: ############################################## Travaux pratiques - Fouille de flux de données ############################################## .. only:: html .. container:: notebook .. image:: _static/zeppelin_classic_logo.png :class: svg-inline `Cahier Zeppelin `_ Références externes utiles : * `Documentation Spark Streaming `_ * `Documentation API Spark en Scala `_ * `Documentation Scala `_ **L'objectif** de cette séance de TP est d'introduire l'utilisation de Spark Streaming pour le traitement de données en flux. Cette séance portera sur la classification automatique de données vectorielles issues d'un flux de données, voir la `documentation Spark Streaming k-means `_. Nous travaillerons sur des données générées en amont de la séance de travaux pratiques. Dans la version 2.2.0 de Spark, l'API *Dataset / Dataframe* ne permet pas l'utilisation directe d'algorithmes de type *Streaming K-means*. Pour cette séance de TP nous nous servirons donc de l'API *RDD*. .. only:: jupyter .. code-block:: scala %spark.dep z.load("org.vegas-viz:vegas_2.11:0.3.11") z.load("org.vegas-viz:vegas-spark_2.11:0.3.11") Récupération des données ========================= Ouvrez une fenêtre terminal (nous l'appellerons « fenêtre de contrôle » dans la suite) et entrez : .. code-block:: bash %%bash mkdir -p tpflux/data cd tpflux/data wget -nc https://cedric.cnam.fr/vertigo/Cours/RCP216/full.zip unzip full.zip Si la commande ``spark-shell`` n'est pas trouvée, entrez d'abord ``export PATH="$PATH:/opt/spark/bin"`` et ensuite ``spark-shell``. La commande ``unzip full.zip`` crée un répertoire ``full`` dans ``data``, dans lequel se trouvent 1000 fichiers de 100 données tridimensionnelles chacun, qui correspondent aux « tranches » du flux de données traitées par la classification automatique de flux (*streaming k-means*). Pour votre information, ces 100.000 données bidimensionnelles dans 5 groupes ont été générées de façon similaire aux données employées dans la `séance sur la classification automatique `_, en utilisant les commandes suivantes dans ``spark-shell`` lancé dans le répertoire ``~/tpflux`` (**attention, n'entrez pas ces commandes, vous venez de télécharger ces données**) : .. code-block:: scala import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.util.KMeansDataGenerator // Génération des données val donneesGenerees = KMeansDataGenerator.generateKMeansRDD(sc, 100000, 5, 2, 5, 1) val donnees = donneesGenerees.map(s => Vectors.dense(s)).cache() // Enregistrement des données dans un fichier texte val donneesTxt = donnees.map(l => l.toString) donneesTxt.saveAsTextFile("data/donnees") Toujours pour votre information, ces données ont ensuite été transformées : les lignes du fichier ``data/donnees/part-00000`` ont été retriées dans un ordre aléatoire avec l'utilitaire linux ``shuf`` et ensuite découpées en groupes de 100 lignes dans des fichiers différents dans le répertoire ``full/`` (**attention, n'entrez pas ces commandes, vous avez déjà ces données**) : .. code-block:: bash $ shuf data/donnees/part-00000 > alldatashuffle $ cd data $ mkdir full $ split -l 100 -a 3 alldatashuffle full/ .. only:: comment Préparation des scripts nécessaires =================================== Pour expérimenter avec la classification automatique de données issues d'un flux, nous avons besoin d'un premier script linux qui prendra un par un les fichiers avec les « tranches » (100 vecteurs tridimensionnels) et les copiera dans un répertoire ``stream``. Le traitement de flux de données de Spark prendra chaque nouveau fichier du répertoire ``stream``, lui appliquera les mêmes opérations (correspondant à une classification automatique incrémentale, voir `streaming k-means `_ et générera des étiquettes de groupe (*cluster*) pour chaque donnée de chaque « tranche ». Afin de nous permettre de visualiser les résultats, le même script linux prendra ces étiquettes et les ajoutera au fichier de données de la « tranche » correspondante, dans un format adapté à la visualisation avec ``gnuplot``. Pour écrire ce script linux, ouvrez un éditeur de texte, copiez dans cet éditeur les lignes suivantes et enregistrez le fichier sous le nom ``prepfiles.sh`` dans le répertoire ``~/tpflux/data`` : .. code-block:: bash dir=$1 # 1er paramètre : répertoire source newdir=$2 # 2ème paramètre : répertoire d'entrée du flux newdir2=$3 # 3ème paramètre : répertoire de préparation pour gnuplot for file in $dir/* # pour chaque fichier source do filename=$(basename $file) # extraire le nom du fichier du nom complet cp $file $newdir/$filename # copier le fichier dans le répertoire flux # copier le fichier, après avoir supprimé le premier et dernier caractère de chaque ligne, # dans le répertoire pour gnuplot cat $file | sed 's/.//;s/.$//' > $newdir2/$filename sleep 10 # attendre 10 secondes # préparer les données pour gnuplot paste --delimiters="," $newdir2/$filename indices/part-00000 >> donneesGnuplot.txt rm -rf indices # préparer une nouvelle écriture de la sortie du flux done Il est ensuite nécessaire de rendre ce fichier ``prepfiles.sh`` exécutable. Pour cela, dans la fenêtre terminal déjà utilisée, entrez : .. code-block:: bash $ chmod a+x prepfiles.sh Les résultats seront affichés avec ``gnuplot`` ; pour que l'affichage puisse être mis à jour après le traitement de chaque nouvelle « tranche » du flux, nous utiliserons ``gnuplot`` avec un fichier script ``show.gp`` dans lequel nous incluerons l'instruction ``reread``. Pour écrire ce script gnuplot, ouvrez un éditeur de texte, copiez dans cet éditeur les lignes suivantes et enregistrez le fichier sous le nom ``show.gp`` dans le répertoire ``~/tpflux/data`` : .. code-block:: bash set datafile separator ',' set palette defined ( 0 "red", 1 "orange", 2 "brown", 3 "green", 4 "blue", 5 "pink", 6 "magenta") splot "donneesGnuplot.txt" using 1:2:3:4 with points lc palette pause 10 reread Vérifiez si ``gnuplot`` est installé sur l'ordinateur sur lequel vous travaillez : entrez ``gnuplot`` dans la fenêtre de contrôle, si le programme est installé alors vous aurez un prompt ``gnuplot>``, quittez en entrant ``quit``. Si ``gnuplot`` n'est pas installé il faudra l'installer, entrez ``sudo yum install gnuplot`` (ou la commande appropriée à votre distribution linux) dans la fenêtre de contrôle. Dans la salle de TP ``gnuplot`` est déjà installé. Nous pouvons maintenant passer à la mise en œuvre de la classification automatique des données en flux. Classification et visualisation des résultats ============================================= Regardez d'abord `streaming k-means `_. Ensuite, lancez spark-shell ou Zeppelin et exécutez le code ci-dessous : .. code-block:: scala // Importation des librairies import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.clustering.StreamingKMeans import org.apache.spark.sql._ import org.apache.spark.sql.types._ .. only:: jupyter **ATTENTION** : si vous utilisez le cluster Zeppelin public du Cnam, remplacez "tpflux" dans tout ce qui suit par "tpflux_votrenom". Sinon vous risquez d'être plusieurs à utiliser en même temps le même répertoire de travail et le TP ne fonctionnera pas correctement. .. code-block:: scala // Création du StreamingContext : le flux sera découpés en "tranches" de 10 secondes, // chaque tranche sera un RDD val ssc = new StreamingContext(sc, Seconds(10)) // Lecture du flux de données d'entree : les nouveaux fichiers déposés dans // "data/stream" seront rassemblés dans un RDD toutes les 10s et traités val trainingData = ssc.textFileStream("tpflux/stream").map(Vectors.parse) // Paramétrage de Streaming k-means val numDimensions = 2 // données 2D val numClusters = 5 // recherche de 5 groupes val model = new StreamingKMeans() // création d'un modèle // Initialisation des paramètres du modèle model.setK(numClusters) model.setDecayFactor(0.5) // valeur de alpha model.setRandomCenters(numDimensions, 1.0) // Indication du flux sur lequel le modèle est appris (les groupes sont trouvés) model.trainOn(trainingData) // Indication du flux sur lequel les prédictions sont faites par le modèle val resultats = model.predictOn(trainingData) // À titre pédagogique pour la visualisation, on sauvegarde sur le disque // les RDD des points traités et des clusters prédits trainingData.foreachRDD(rdd => rdd.saveAsTextFile("tpflux/points")) resultats.foreachRDD(rdd => rdd.saveAsTextFile("tpflux/resultats")) Pour pouvoir démarrer le traitement, dans la fenêtre de contrôle (dans laquelle vous êtes positionné dans le répertoire ``tpflux/data``) entrez (démarrage de la « source » du flux) : .. code-block:: bash %%bash mkdir -p tpflux/stream tpflux/resultats tpflux/points Nous pouvons maintenant lancer le système de flux de Spark : .. code-block:: scala ssc.start() Vous pouvez désormais copier des fichiers du dossier ``full`` dans le répertoire ``tpflux/stream``. Toutes les 10 secondes, Spark va chercher les nouveaux fichiers, les lire et les traiter. Par exemple, nous pouvons copier les trois premiers fichiers : .. code-block:: bash %%bash # Adapter cette commande pour un système Windows (ou copiez les fichiers manuellement) cp tpflux/data/full/{aaa,aab,aac} tpflux/stream Le traitement peut prendre quelques temps, on peut vérifier qu'il s'est exécuter lorsque les résultats intermédiaires sont sauvegardés (lorsque le résultat de la commande suivant affiche des fichiers ``part-0000X``) : .. code-block:: bash %%bash ls tpflux/resultats/ # sous Windows : dir tpflux/resultats/ Nous pouvons charger les résultats pour cette tranche du flux car nous avons sauvegardé les *RDD* correspondant (``saveAsTextFile``) dans les répertoires ``tpflux/points`` et ``tpflux/resultats`` : .. code-block:: scala // On lit les RDD et on créé un DataFrame à deux colonnes val clusters = sc.textFile("tpflux/resultats/").map(_.toInt) val points = sc.textFile("tpflux/points").map(p => Vectors.parse(p).toArray) val predictions = clusters.zip(points).toDF("cluster", "coords") predictions.printSchema() Vegas permet de visualiser les clusters ainsi obtenus : .. code-block:: scala // Import des bibliothèques de Vegas implicit val render = vegas.render.ShowHTML(s => print("%html " + s)) import vegas._ import vegas.data.External._ import vegas.sparkExt._ // Construction du nuage de points val scatter = predictions.withColumn("x", $"coords".getItem(0)).withColumn("y", $"coords".getItem(1)) Vegas("Données initiales").withDataFrame(scatter).mark(Point).encodeX("x", Quant).encodeY("y", Quant).encodeColor("cluster", Nom).show Les valeurs des centres des clusters sont accessibles via l'objet ``lastModel()`` qui renvoie le dernier modèle *KMeans* calculé à partir des données du flux : .. code-block:: scala model.latestModel().clusterCenters .. admonition:: Question Copiez/coller de nouveaux fichiers dans le répertoire ``stream`` (manuellement ou en ligne de commande, par exemple tous les fichiers commençant par ``ab``: ``cp tpflux/data/full/ab* tpflux/stream``). Patientez le temps que Spark les traite tous puis relancez le chargement des *RDD* intermédiaires et la visualisation (les trois blocs de code précédents). Que constatez-vous ? .. ifconfig:: tpscala in ('public') .. admonition:: Correction La visualisation est mise à jour avec la nouvelle tranche du flux. Le modèle est par ailleurs actualisé. Pour arrêter le processus, entrez d'abord dans la fenêtre Spark (arrêt du ``StreamingContext``) : .. code-block:: scala ssc.stop(false,true) .. admonition:: Question Consultez la `documentation de StreamingContext `_. À quoi correspondent les deux paramètres ``false`` et ``true`` de la commande précédente ? .. ifconfig:: tpscala in ('public') .. admonition:: Correction Le premier argument de ``ssc.stop(...)`` indique si l'on doit oui ou non arrêter le contexte d'exécution Spark. Généralement on préfère que non afin de continuer à travailler (sinon il faut relancer Spark). Le deuxième paramètre correspond à l'arrêt élégant, c'est-à-dire en attendant la fin de toutes les opérations sur le flux. Pour relancer tout le processus, il est nécessaire de faire d'abord un peu de nettoyage, notamment de vider les répertoires contenant les résultats temporaires : .. code-block:: bash %%bash rm tpflux/stream/* ; rm tpflux/resultats/*; rm tpflux/points/*; Il est ensuite nécessaire de recréer un StreamingContext et un modèle. Pour ce faire, remontez jusqu'au paragraphe débutant par ``val ssc = ...`` et relancez les paragraphes précédents dans le même ordre qu'au début. Par défaut, ``StreamingKMeans`` initialise les centroïdes des groupes en les tirant au hasard dans une distribution suivant la loi normale. Il est possible de modifier ce comportement en spécifiant explicitement les centres à utiliser. Par exemple, nous pouvons plutôt choisir des centres initiaux aléatoirement tirés dans la première tranche du jeu de données en remplaçant la ligne ``model.setRandomCenters(...)`` par : .. code-block:: scala val randomCenters = Array( // Centres aléatoires provenant de la première tranche du jeu de données Vectors.dense(-0.44220413761057725,-0.5584771653546362), Vectors.dense(0.15052315935183933,0.29710547691360334), Vectors.dense(0.1068784297084591,0.40327750843541743), Vectors.dense(0.13651858564781957,0.36766728799972975), Vectors.dense(-0.42636087410514484,-0.4849018049246935)) model.setInitialCenters(randomCenters, Array.fill(5)(1.0)) En répétant l'apprentissage du k-means avec ces centroïdes initiaux, vous devriez obtenir des résultats raisonnables. Si l'on a déjà une idée de la position des centres, nous pouvons bien entendu utiliser ceux-ci. Dans notre cas, nous connaissons exactement la position des centres réels car nous avons nous-mêmes généré les données. Vous pouvez réaliser le k-means en flux en utilisant les bons centres initiaux pour obtenir un partitionnement « parfait ». .. admonition:: Question À partir de la visualisation du nuage de points, déterminez visuellement des centres approchés raisonnables. Utilisez ces valeurs pour l'initialisation. Le streaming KMeans converge-t-il plus rapidement ? .. ifconfig:: tpscala in ('private') .. only:: jupyter .. code-block:: scala Questions ========= .. admonition:: Question : Quelle est l'utilité de ``setDecayFactor`` ? Modifiez sa valeur et regardez l'impact sur le résultat. .. ifconfig:: tpscala in ('public') .. admonition:: Correction : Lors du traitement de « tranches » successives du flux, ``setDecayFactor`` permet de réduire l'impact des « tranches » précédentes sur la position des centres. La valeur de ``1.0`` est maximale (impact constant), la valeur de ``0.0`` est minimale (impact nul des tranches précédentes). Pour des valeurs faibles mais non nulles (par ex. 0.1) vous constaterez une certaine « instabilité » des résultats : les groupes (identifiés par une couleur) peuvent parfois changer d'une tranche à la suivante. .. only:: comment .. admonition:: Question : Qu'affiche vraiment ``gnuplot`` dans cet exemple ? Expliquez pourquoi l'évolution de l'affichage est celle que vous constatez. .. ifconfig:: tpscala in ('public') .. admonition:: Correction : Dans le script ``prepfiles.sh``, l'utilisation de ``>> donneesGnuplot.txt`` indique que les nouvelles données étiquettées sont *ajoutées* aux données affichées précédemment. Au démarrage de l'algorithme de classification automatique de flux, les centres sont aléatoires et donc les étiquettes de groupe affectées aux données ne correspondent pas aux groupes qui avaient été utilisés lors de la génération des données. Au fur et à mesure que de nouvelles données sont traitées et les centres évoluent, les groupes résultants se rapprochent des groupes utilisés pour la génération. Ces nouvelles données sont affichées, dans la vue 2D de l'affichage avec ``splot``, au-dessus des anciennes étiquettes, les couleurs des groupes deviennent donc uniformes malgré la présence dans ces groupes d'anciennes données qui avaient une « mauvaise » étiquette de groupe (chaque étiquette de groupe est représentée par une couleur de la palette).