Travaux pratiques - Fouille de flux de données

Références externes utiles :

L’objectif de cette séance de TP est d’introduire l’utilisation de Spark Streaming pour le traitement de données en flux.

Cette séance portera sur la classification automatique de données vectorielles issues d’un flux de données, voir la documentation Spark Streaming k-means. Nous travaillerons sur des données générées en amont de la séance de travaux pratiques.

L’API Dataset / Dataframe ne permet pas l’utilisation directe d’algorithmes de type Streaming K-means. Pour cette séance de TP nous nous servirons donc de l’API RDD.

Nous allons pour cette séance utiliser Vegas-viz comme outil de visualisation. Avec la gestion dynamique des dépendances pour Jupyter/Toree, il suffit d’utiliser la commande magique %AddDeps pour importer les paquets adéquats :

// Vegas v0.3.11 pour Scala 2.11
// L'option --transitive permet de télécharger également les dépendances de Vegas

%AddDeps org.vegas-viz vegas_2.11 0.3.11 --transitive
%AddDeps org.vegas-viz vegas-spark_2.11 0.3.11 --transitive

Récupération des données

Pour cette séance de travaux pratiques, nous aurons besoin d’exécuter quelques commandes dans un terminal Linux ou MacOS. Les instructions dépendent selon que vous utilisez spark-shell (par exemple, sur les machines de TP) ou JupyterHub.

Nous allons commencer par créer le répertoire contenant les données que nous transmettrons en flux à Spark:

import sys.process._

// Création des dossiers
"mkdir -p tpflux/data tpflux/stream tpflux/resultats tpflux/points" !

// Téléchargement des données
"wget -q -nc https://cedric.cnam.fr/vertigo/Cours/RCP216/docs/full.zip -P tpflux/data" !

// Dézippage du fichier
"unzip -q tpflux/data/full.zip -d tpflux/data/" !

La commande unzip full.zip crée un répertoire full dans data, dans lequel se trouvent 1000 fichiers de 100 données tridimensionnelles chacun, qui correspondent aux « tranches » du flux de données traitées par la classification automatique de flux (streaming k-means).

Note

Pour votre information, ces 100.000 données bidimensionnelles dans 5 groupes ont été générées de façon similaire aux données employées dans la séance sur la classification automatique, en utilisant les commandes suivantes dans spark-shell lancé dans le répertoire ~/tpflux (attention, n’entrez pas ces commandes, vous venez de télécharger ces données) :

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.KMeansDataGenerator

// Génération des données
val donneesGenerees = KMeansDataGenerator.generateKMeansRDD(sc, 100000, 5, 3, 1, 1)
val donnees = donneesGenerees.map(s => Vectors.dense(s)).cache()

// Enregistrement des données dans un fichier texte
val donneesTxt = donnees.map(l => l.toString)
donneesTxt.saveAsTextFile("tpflux/data/donnees")

Note

Toujours pour votre information, ces données ont ensuite été transformées : les lignes du fichier data/donnees/part-00000 ont été retriées dans un ordre aléatoire avec l’utilitaire linux shuf et ensuite découpées en groupes de 100 lignes dans des fichiers différents dans le répertoire full/ (attention, n’entrez pas ces commandes, vous avez déjà ces données) :

shuf data/donnees/part-00000 > alldatashuffle
mkdir -p data/full
split -l 100 -a 3 alldatashuffle data/full/
zip data/full.zip data/full/*

Classification et visualisation des résultats

Regardez d’abord la documentation du streaming k-means dans Spark. Dans spark-shell ou dans Jupyter, vous pouvez ensuite importer les modules dont nous aurons besoin :

// Importation des librairies
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.clustering.StreamingKMeans
import org.apache.spark.sql._
import org.apache.spark.sql.types._

StreamingContext permet de définir la façon dont Spark va gérer le flux. Dans notre cas, nous considérerons des tranches de dix secondes, c’est-à-dire que Spark va réaliser le traitement sur les données reçues dans les dix dernières secondes.

// Création du StreamingContext : le flux sera découpés en "tranches" de 10 secondes,
//   chaque tranche sera un RDD
val ssc = new StreamingContext(sc, Seconds(10))

// Lecture du flux de données d'entree : les nouveaux fichiers déposés dans
//   "data/stream" seront rassemblés dans un RDD toutes les 10s et traités
val trainingData = ssc.textFileStream("tpflux/stream").map(Vectors.parse)

// Paramétrage de Streaming k-means
val numDimensions = 3   // données 3D
val numClusters = 5     // recherche de 5 groupes
val model = new StreamingKMeans()  // création d'un modèle
// Initialisation des paramètres du modèle
model.setK(numClusters)
model.setDecayFactor(0.5)   // valeur de alpha
model.setRandomCenters(numDimensions, 1.0)

// Indication du flux sur lequel le modèle est appris (les groupes sont trouvés)
model.trainOn(trainingData)

// Indication du flux sur lequel les prédictions sont faites par le modèle
val resultats = model.predictOn(trainingData)

// À titre pédagogique pour la visualisation, on sauvegarde sur le disque
// les RDD des points traités et des clusters prédits
trainingData.foreachRDD(rdd => rdd.saveAsTextFile("tpflux/points"))
resultats.foreachRDD(rdd => rdd.saveAsTextFile("tpflux/resultats"))

Nous pouvons maintenant lancer le système de flux de Spark :

ssc.start()

Vous pouvez désormais copier des fichiers du dossier full dans le répertoire tpflux/stream. Toutes les 10 secondes, Spark cherche les nouveaux fichiers, les lit et les traite.

Par exemple, nous pouvons copier les trois premiers fichiers avec la commande suivante (à entrer dans la fenêtre de contrôle) :

// Adapter cette commande pour un système Windows (ou copiez les fichiers manuellement)
import sys.process._

"cp tpflux/data/full/aaa tpflux/data/full/aab tpflux/data/full/aac tpflux/stream" !

Le traitement peut prendre quelques temps, on peut vérifier qu’il s’est exécuter lorsque les résultats intermédiaires sont sauvegardés (lorsque le résultat de la commande suivante, entrée dans la fenêtre de contrôle, affiche des fichiers part-0000X) :

"ls tpflux/resultats/" !
// sous Windows : dir tpflux/resultats/

Nous pouvons charger les résultats pour cette tranche du flux car nous avons sauvegardé les RDD correspondant (saveAsTextFile) dans les répertoires tpflux/points et tpflux/resultats :

// On lit les RDD et on créé un DataFrame à deux colonnes
val clusters = sc.textFile("tpflux/resultats/").map(_.toInt)
val points = sc.textFile("tpflux/points").map(p => Vectors.parse(p).toArray)
val predictions = clusters.zip(points).toDF("cluster", "coords")
predictions.printSchema()

Vegas permet de visualiser les clusters ainsi obtenus (en 2 dimensions seulement) :

// Import des bibliothèques de Vegas
implicit val render = vegas.render.ShowHTML(kernel.display.content("text/html", _))

import vegas._
import vegas.data.External._
import vegas.sparkExt._

// Construction du nuage de points
val scatter = predictions.withColumn("x", $"coords".getItem(0)).withColumn("y", $"coords".getItem(1))
Vegas("Données initiales").withDataFrame(scatter).mark(Point).encodeX("x", Quant).encodeY("y", Quant).encodeColor("cluster", Nom).show

Les valeurs des centres des clusters sont accessibles via l’objet lastModel() qui renvoie le dernier modèle KMeans calculé à partir des données du flux :

model.latestModel().clusterCenters

Question

Copiez/coller de nouveaux fichiers dans le répertoire stream (manuellement ou en ligne de commande, par exemple tous les fichiers commençant par ab: cp tpflux/data/full/ab* tpflux/stream). Patientez le temps que Spark les traite tous puis relancez le chargement des RDD intermédiaires et la visualisation (les trois blocs de code précédents). Que constatez-vous ?

Pour arrêter le processus, entrez d’abord dans la fenêtre Spark (arrêt du StreamingContext) :

ssc.stop(false,true)

Question

Consultez la documentation de StreamingContext. À quoi correspondent les deux paramètres false et true de la commande précédente ?

Pour relancer tout le processus, il est nécessaire de faire d’abord un peu de nettoyage, notamment de vider les répertoires contenant les résultats temporaires :

"rm -r tpflux/stream tpflux/resultats tpflux/points " !

"mkdir -p tpflux/stream tpflux/resultats tpflux/points " !

Il est ensuite nécessaire de recréer un StreamingContext et un modèle. Pour ce faire, remontez jusqu’au paragraphe débutant par val ssc = ... et relancez les paragraphes précédents dans le même ordre qu’au début.

Par défaut, StreamingKMeans initialise les centroïdes des groupes en les tirant au hasard dans une distribution suivant la loi normale. Il est possible de modifier ce comportement en spécifiant explicitement les centres à utiliser.

Par exemple, nous pouvons plutôt choisir des centres initiaux aléatoirement tirés dans la première tranche du jeu de données en remplaçant la ligne model.setRandomCenters(...) par :

val randomCenters = Array( // Centres aléatoires provenant de la première tranche du jeu de données
    Vectors.dense(1.9694960109392348,-0.4935929095164415,-2.4170814972553543),
    Vectors.dense(-1.8664826111202575,-3.496003219019423,0.6136412963273251),
    Vectors.dense(-0.15369759819355844,-1.1054851590607655,-1.1214291749609204),
    Vectors.dense(1.6853803159957776,-1.5351888498944155,-0.533050844909388),
    Vectors.dense(-2.2844115879924636,-1.6973609655616806,-2.9603827271687932))
model.setInitialCenters(randomCenters, Array.fill(5)(1.0))

En répétant l’apprentissage du k-means avec ces centroïdes initiaux, vous devriez obtenir des résultats raisonnables.

Si l’on a déjà une idée de la position des centres, nous pouvons bien entendu utiliser ceux-ci. Dans notre cas, nous connaissons exactement la position des centres réels car nous avons nous-mêmes généré les données. Vous pouvez réaliser le k-means en flux en utilisant les bons centres initiaux pour obtenir un partitionnement « parfait ».

Question

À partir de la visualisation du nuage de points, déterminez visuellement des centres approchés raisonnables. Utilisez ces valeurs pour l’initialisation. Le streaming KMeans converge-t-il plus rapidement ?

Question :

Quelle est l’utilité de setDecayFactor ? Modifiez sa valeur et regardez l’impact sur le résultat.