Travaux pratiques - Fouille de flux de données¶
(une variante de ce TP utilisant le langage Scala)
Références externes utiles :
L’objectif de cette séance de TP est d’introduire l’utilisation de Spark Streaming pour le traitement de données en flux. Nous nous intéressons à la classification automatique de données vectorielles issues d’un flux, voir la documentation Spark Streaming k-means. Nous travaillerons sur des données générées en amont de la séance de travaux pratiques.
L’API Dataset / Dataframe ne permet pas l’utilisation directe d’algorithmes de type Streaming K-means. Pour cette séance de TP nous employons donc l’API RDD.
Récupération des données¶
Pour cette séance de travaux pratiques, nous aurons besoin d’exécuter quelques commandes dans un terminal Linux ou MacOS. Les instructions dépendent selon que vous utilisez pyspark ou JupyterHub.
Pour préparer les répertoires et télécharger les données générées en amont, ouvrez une fenêtre terminal et entrez les commandes suivantes :
%%bash mkdir -p tpflux/data tpflux/stream tpflux/resultats tpflux/points wget -q -nc https://cedric.cnam.fr/vertigo/Cours/RCP216/docs/full.zip -P tpflux/data unzip -q tpflux/data/full.zip -d tpflux/data/ pyspark
Sur les machines de TP du Cnam, si la commande pyspark
n’est pas trouvée, entrez d’abord export PATH="$PATH:/opt/spark/bin"
et ensuite pyspark
.
Sous Windows, créez le répertoire tpflux
et les sous-répertoires data
, stream
, resultats
et points
, téléchargez le fichier de données dans le sous-répertoire data
, décompressez-le avec unzip -q tpflux\data\full.zip -d tpflux\data\
, ouvrez une fenêtre invite de commandes et lancez pyspark
.
La commande unzip full.zip
crée un répertoire full
dans data
, dans lequel se trouvent 1000 fichiers de 100 données tridimensionnelles chacun, qui correspondent aux « tranches » du flux de données traitées par la classification automatique de flux (streaming k-means).
Note
Pour votre information, ces 100.000 données tridimensionnelles dans 5 groupes ont été générées de façon similaire aux données employées dans la séance sur la classification automatique, en utilisant les commandes suivantes en langage Scala dans
spark-shell
lancé dans le répertoiretpflux
(attention, n’entrez pas ces commandes, vous venez de télécharger ces données) :
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.KMeansDataGenerator
// Génération des données
val donneesGenerees = KMeansDataGenerator.generateKMeansRDD(sc, 100000, 5, 3, 1, 1)
val donnees = donneesGenerees.map(s => Vectors.dense(s)).cache()
// Enregistrement des données dans un fichier texte
val donneesTxt = donnees.map(l => l.toString)
donneesTxt.saveAsTextFile("tpflux/data/donnees")
Note
Toujours pour votre information, ces données ont ensuite été transformées : les lignes du fichier tpflux/data/donnees/part-00000
ont été retriées dans un ordre aléatoire avec l’utilitaire linux shuf
et ensuite découpées en groupes de 100 lignes dans des fichiers différents dans le répertoire full/
(attention, n’entrez pas ces commandes, vous avez déjà ces données) :
shuf tpflux/data/donnees/part-00000 > alldatashuffle
mkdir -p tpflux/data/full
split -l 100 -a 3 alldatashuffle tpflux/data/full/
zip tpflux/data/full.zip tpflux/data/full/*
Classification et visualisation des résultats¶
Regardez d’abord la documentation du streaming k-means dans Spark. Dans pyspark ou dans Jupyter, vous pouvez ensuite importer les modules dont nous aurons besoin :
# Importation des librairies
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import StreamingKMeans
StreamingContext permet de définir la façon dont Spark va gérer le flux. Dans notre cas, nous considérerons des tranches de dix secondes, c’est-à-dire que Spark va réaliser le traitement sur les données reçues dans les dix dernières secondes.
# Création du StreamingContext : flux découpé en "tranches" de 10 secondes,
# chaque tranche sera un RDD
sc = spark.sparkContext
ssc = StreamingContext(sc, 10)
# Lecture du flux de données d'entree : les nouveaux fichiers déposés dans
# "data/stream" seront rassemblés dans un RDD toutes les 10s et traités
trainingData = ssc.textFileStream("tpflux/stream").map(Vectors.parse)
# Paramétrage de Streaming k-means
numClusters = 5
numDimensions = 3
Alpha = 0.5
model = StreamingKMeans().setK(numClusters).setDecayFactor(Alpha) \
.setRandomCenters(numDimensions, weight=1.0, seed=10)
# Indication du flux sur lequel le modèle est appris (les groupes sont trouvés)
model.trainOn(trainingData)
# Indication du flux sur lequel les prédictions sont faites par le modèle
resultats = model.predictOn(trainingData)
# À titre pédagogique pour la visualisation, on sauvegarde sur le disque
# les RDD des points traités et des clusters prédits
trainingData.foreachRDD(lambda rdd: rdd.saveAsTextFile("tpflux/points"))
resultats.foreachRDD(lambda rdd: rdd.saveAsTextFile("tpflux/resultats"))
Nous pouvons maintenant lancer le système de flux de Spark :
ssc.start()
Vous pouvez désormais copier des fichiers du dossier full
dans le répertoire tpflux/stream
. Toutes les 10 secondes, Spark cherche les nouveaux fichiers, les lit et les traite.
Par exemple, nous pouvons copier les trois premiers fichiers avec la commande suivante (à entrer dans la fenêtre de contrôle) :
import os
os.system("cp tpflux/data/full/aa* tpflux/stream")
# Adapter cette commande pour un système Windows ou copier les fichiers manuellement
Le traitement peut prendre quelques temps, on peut vérifier qu’il s’est exécuté lorsque les résultats intermédiaires sont sauvegardés, c’est à dire lorsqu’on trouve des fichiers part-0000X
dans tpflux/resultats/
. Pour le savoir il est possible d’utiliser la commande suivante. Noter que si Jupyter a été lancé par un appel à pyspark
, le résultat de la commande n’est pas affiché dans le cahier Jupyter mais dans la fenêtre dans laquelle pyspark
a été appelé.
!ls tpflux/resultats/
# sous Windows : dir tpflux/resultats/
Nous pouvons charger les résultats pour cette tranche du flux car nous avons sauvegardé les RDD correspondant (saveAsTextFile
) dans les répertoires tpflux/points
et tpflux/resultats
:
# On lit les données utilisées pour trouverles centres
from pyspark.sql.types import DoubleType, StructType, StructField
points = sc.textFile("tpflux/points").map(lambda v: Vectors.parse(v)) \
.map(lambda p: (float(p[0]), float(p[1]), float(p[2])))
champs = [StructField("v"+str(i), DoubleType(), True) for i in range(0, 3)]
pointsdf = spark.createDataFrame(points,StructType(champs))
pointsdf.printSchema()
pointsdf.show(3)
# On lit les centres correspondants
clusters = spark.read.option("inferSchema", True).csv("tpflux/resultats/")
clusters.printSchema()
Pour afficher les résultats nous pouvons nous contenter d’un affichage 2D (les autres paires de coordonnées peuvent aussi être afichées) :
aafficher = pointsdf.toPandas()
aafficher["groupe"] = clusters.toPandas()
print(aafficher.head(3))
aafficher.plot.scatter('v0', 'v1', c='groupe', cmap='coolwarm')
ou, si ipympl
est bien installé, avoir un affichage 3D que nous pouvons tourner avec la souris :
%matplotlib widget
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
# Affichage du nuage de points
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
ax.scatter(aafficher['v0'], aafficher['v1'], aafficher['v2'], c=aafficher['groupe'], cmap='coolwarm')
plt.title("Groupes")
Alternativement, remplacer %matplotlib widget
par %matplotlib inline
permet d’avoir une vue 3D fixe.
Les valeurs des centres des clusters sont accessibles via l’objet lastModel()
qui renvoie le dernier modèle KMeans calculé à partir des données du flux :
model.latestModel().clusterCenters
Question
Copiez/coller de nouveaux fichiers dans le répertoire stream
(manuellement ou en ligne de commande, par exemple tous les fichiers commençant par ab
: cp tpflux/data/full/ab* tpflux/stream
). Patientez le temps que Spark les traite tous puis relancez le chargement des RDD intermédiaires et la visualisation (les trois blocs de code précédents). Que constatez-vous ?
Correction
La visualisation est mise à jour avec la nouvelle tranche du flux. Le modèle est par ailleurs actualisé.
Pour arrêter le processus (arrêt du StreamingContext
), entrez d’abord dans la fenêtre pyspark
:
ssc.stop(False,True)
Question
Consultez la documentation de StreamingContext. À quoi correspondent les deux paramètres False
et True
de la commande précédente ?
Correction
Le premier argument de ssc.stop(...)
indique si l’on doit oui ou non arrêter le contexte d’exécution Spark. Généralement on préfère ne pas l’arrêter afin de continuer à travailler (sinon il faut relancer Spark).
Le deuxième paramètre correspond à l’arrêt « élégant », c’est-à-dire en attendant la fin de toutes les opérations sur le flux.
Pour relancer tout le processus, il est nécessaire de faire d’abord un peu de nettoyage, notamment de vider les répertoires contenant les résultats temporaires :
os.system("rm -r tpflux/stream tpflux/resultats tpflux/points")
os.system("mkdir -p tpflux/stream tpflux/resultats tpflux/points")
Il est ensuite nécessaire de recréer un StreamingContext et un modèle. Pour ce faire, remontez jusqu’au paragraphe débutant par ssc = ...
et relancez les paragraphes précédents dans le même ordre qu’au début.
Par défaut, StreamingKMeans
initialise les centroïdes des groupes en les tirant au hasard dans une distribution suivant la loi normale.
Il est possible de modifier ce comportement en spécifiant explicitement les centres à utiliser.
Par exemple, nous pouvons plutôt choisir des centres initiaux aléatoirement tirés dans la première tranche du jeu de données en remplaçant la ligne model.setRandomCenters(...)
par :
# Centres aléatoires provenant de la première tranche du jeu de données
import numpy as np
randomCenters = np.array([ \
Vectors.dense(1.9694960109392348,-0.4935929095164415,-2.4170814972553543), \
Vectors.dense(-1.8664826111202575,-3.496003219019423,0.6136412963273251), \
Vectors.dense(-0.15369759819355844,-1.1054851590607655,-1.1214291749609204), \
Vectors.dense(1.6853803159957776,-1.5351888498944155,-0.533050844909388), \
Vectors.dense(-2.2844115879924636,-1.6973609655616806,-2.9603827271687932)])
model.setInitialCenters(randomCenters, np.ones(5))
Il est possible de répéter l’apprentissage du k-means avec ces centroïdes initiaux.
Si l’on a déjà une idée de la position des centres, nous pouvons bien entendu utiliser ceux-ci. Dans notre cas, nous connaissons exactement la position des centres réels car nous avons nous-mêmes généré les données. Vous pouvez réaliser le k-means en flux en utilisant les bons centres initiaux pour obtenir un partitionnement « parfait ».
Question
À partir de la visualisation du nuage de points, déterminez visuellement des centres approchés raisonnables. Utilisez ces valeurs pour l’initialisation. Le streaming KMeans converge-t-il plus rapidement ?
Correction
Choisir les points directement depuis la première tranche des observations permet de faire converger KMeans plus rapidement. En effet, par défaut Spark initialise les centres au hasard en les tirant depuis une loi normale. Mais les observations que nous avons ne sont pas normalement distribuées et il faut donc attendre un certain nombre d’itérations pour que les centres se rapprochent de la distribution réelle des données.
Question :
Quelle est l’utilité de setDecayFactor
? Modifiez sa valeur et regardez l’impact sur le résultat.
Correction :
Lors du traitement de « tranches » successives du flux, setDecayFactor
permet de réduire l’impact des « tranches » précédentes sur la position des centres. La valeur de 1.0
est maximale (l’impact ne diminue pas avec les itérations), la valeur de 0.0
est minimale (l’impact des tranches précédentes est nul). Pour des valeurs faibles mais non nulles (par ex. 0.1) vous constaterez une certaine « instabilité » des résultats : les groupes (identifiés par une couleur) peuvent parfois changer d’une tranche à la suivante.