Travaux pratiques - Fouille de flux de données

(une variante de ce TP utilisant le langage Scala)

Références externes utiles :

L’objectif de cette séance de TP est d’introduire l’utilisation de Spark Streaming pour le traitement de données en flux. Nous nous intéressons à la classification automatique de données vectorielles issues d’un flux, voir la documentation Spark Streaming k-means. Nous travaillerons sur des données générées en amont de la séance de travaux pratiques.

L’API Dataset / Dataframe ne permet pas l’utilisation directe d’algorithmes de type Streaming K-means. Pour cette séance de TP nous employons donc l’API RDD.

Récupération des données

Pour cette séance de travaux pratiques, nous aurons besoin d’exécuter quelques commandes dans un terminal Linux ou MacOS. Les instructions dépendent selon que vous utilisez pyspark (par exemple, sur les machines de TP) ou JupyterHub.

Pour préparer les répertoires et télécharger les données générées en amont, ouvrez une fenêtre terminal et entrez les commandes suivantes :

%%bash
mkdir -p tpflux/data tpflux/stream tpflux/resultats tpflux/points
wget -q -nc https://cedric.cnam.fr/vertigo/Cours/RCP216/docs/full.zip -P tpflux/data
unzip -q tpflux/data/full.zip -d tpflux/data/
pyspark

Sur les machines de TP du Cnam, si la commande pyspark n’est pas trouvée, entrez d’abord export PATH="$PATH:/opt/spark/bin" et ensuite pyspark.

Sous Windows, créez le répertoire tpflux et les sous-répertoires data, stream, resultats et points, téléchargez le fichier de données dans le sous-répertoire data, décompressez-le avec unzip -q tpflux\data\full.zip -d tpflux\data\, ouvrez une fenêtre invite de commandes et lancez pyspark.

La commande unzip full.zip crée un répertoire full dans data, dans lequel se trouvent 1000 fichiers de 100 données tridimensionnelles chacun, qui correspondent aux « tranches » du flux de données traitées par la classification automatique de flux (streaming k-means).

Note

Pour votre information, ces 100.000 données tridimensionnelles dans 5 groupes ont été générées de façon similaire aux données employées dans la séance sur la classification automatique, en utilisant les commandes suivantes en langage Scala dans spark-shell lancé dans le répertoire tpflux (attention, n’entrez pas ces commandes, vous venez de télécharger ces données) :

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.KMeansDataGenerator

// Génération des données
val donneesGenerees = KMeansDataGenerator.generateKMeansRDD(sc, 100000, 5, 3, 1, 1)
val donnees = donneesGenerees.map(s => Vectors.dense(s)).cache()

// Enregistrement des données dans un fichier texte
val donneesTxt = donnees.map(l => l.toString)
donneesTxt.saveAsTextFile("tpflux/data/donnees")

Note

Toujours pour votre information, ces données ont ensuite été transformées : les lignes du fichier tpflux/data/donnees/part-00000 ont été retriées dans un ordre aléatoire avec l’utilitaire linux shuf et ensuite découpées en groupes de 100 lignes dans des fichiers différents dans le répertoire full/ (attention, n’entrez pas ces commandes, vous avez déjà ces données) :

shuf tpflux/data/donnees/part-00000 > alldatashuffle
mkdir -p tpflux/data/full
split -l 100 -a 3 alldatashuffle tpflux/data/full/
zip tpflux/data/full.zip tpflux/data/full/*

Classification et visualisation des résultats

Regardez d’abord la documentation du streaming k-means dans Spark. Dans pyspark ou dans Jupyter, vous pouvez ensuite importer les modules dont nous aurons besoin :

# Importation des librairies
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import StreamingKMeans

StreamingContext permet de définir la façon dont Spark va gérer le flux. Dans notre cas, nous considérerons des tranches de dix secondes, c’est-à-dire que Spark va réaliser le traitement sur les données reçues dans les dix dernières secondes.

# Création du StreamingContext : flux découpé en "tranches" de 10 secondes,
#   chaque tranche sera un RDD
sc = spark.sparkContext
ssc = StreamingContext(sc, 10)

# Lecture du flux de données d'entree : les nouveaux fichiers déposés dans
#   "data/stream" seront rassemblés dans un RDD toutes les 10s et traités
trainingData = ssc.textFileStream("tpflux/stream").map(Vectors.parse)

# Paramétrage de Streaming k-means
numClusters = 5
numDimensions = 3
Alpha = 0.5
model = StreamingKMeans().setK(numClusters).setDecayFactor(Alpha) \
                         .setRandomCenters(numDimensions, weight=1.0, seed=10)

# Indication du flux sur lequel le modèle est appris (les groupes sont trouvés)
model.trainOn(trainingData)

# Indication du flux sur lequel les prédictions sont faites par le modèle
resultats = model.predictOn(trainingData)

# À titre pédagogique pour la visualisation, on sauvegarde sur le disque
# les RDD des points traités et des clusters prédits
trainingData.foreachRDD(lambda rdd: rdd.saveAsTextFile("tpflux/points"))
resultats.foreachRDD(lambda rdd: rdd.saveAsTextFile("tpflux/resultats"))

Nous pouvons maintenant lancer le système de flux de Spark :

ssc.start()

Vous pouvez désormais copier des fichiers du dossier full dans le répertoire tpflux/stream. Toutes les 10 secondes, Spark cherche les nouveaux fichiers, les lit et les traite.

Par exemple, nous pouvons copier les trois premiers fichiers avec la commande suivante (à entrer dans la fenêtre de contrôle) :

import os
os.system("cp tpflux/data/full/aa* tpflux/stream")
# Adapter cette commande pour un système Windows ou copier les fichiers manuellement

Le traitement peut prendre quelques temps, on peut vérifier qu’il s’est exécuté lorsque les résultats intermédiaires sont sauvegardés, c’est à dire lorsqu’on trouve des fichiers part-0000X dans tpflux/resultats/. Pour le savoir il est possible d’utiliser la commande suivante. Noter que si Jupyter a été lancé par un appel à pyspark, le résultat de la commande n’est pas affiché dans le cahier Jupyter mais dans la fenêtre dans laquelle pyspark a été appelé.

!ls tpflux/resultats/
# sous Windows : dir tpflux/resultats/

Nous pouvons charger les résultats pour cette tranche du flux car nous avons sauvegardé les RDD correspondant (saveAsTextFile) dans les répertoires tpflux/points et tpflux/resultats :

# On lit les données utilisées pour trouverles centres
from pyspark.sql.types import DoubleType, StructType, StructField
points = sc.textFile("tpflux/points").map(lambda v: Vectors.parse(v)) \
           .map(lambda p: (float(p[0]), float(p[1]), float(p[2])))
champs = [StructField("v"+str(i), DoubleType(), True) for i in range(0, 3)]
pointsdf = spark.createDataFrame(points,StructType(champs))
pointsdf.printSchema()
pointsdf.show(3)

# On lit les centres correspondants
clusters = spark.read.option("inferSchema", True).csv("tpflux/resultats/")
clusters.printSchema()

Pour afficher les résultats nous pouvons nous contenter d’un affichage 2D (les autres paires de coordonnées peuvent aussi être afichées) :

aafficher = pointsdf.toPandas()
aafficher["groupe"] = clusters.toPandas()
print(aafficher.head(3))
aafficher.plot.scatter('v0', 'v1', c='groupe', cmap='coolwarm')

ou, si ipympl est bien installé, avoir un affichage 3D que nous pouvons tourner avec la souris :

%matplotlib widget
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

# Affichage du nuage de points
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
ax.scatter(aafficher['v0'], aafficher['v1'], aafficher['v2'], c=aafficher['groupe'], cmap='coolwarm')
plt.title("Groupes")

Alternativement, remplacer %matplotlib widget par %matplotlib inline permet d’avoir une vue 3D fixe.

Les valeurs des centres des clusters sont accessibles via l’objet lastModel() qui renvoie le dernier modèle KMeans calculé à partir des données du flux :

model.latestModel().clusterCenters

Question

Copiez/coller de nouveaux fichiers dans le répertoire stream (manuellement ou en ligne de commande, par exemple tous les fichiers commençant par ab: cp tpflux/data/full/ab* tpflux/stream). Patientez le temps que Spark les traite tous puis relancez le chargement des RDD intermédiaires et la visualisation (les trois blocs de code précédents). Que constatez-vous ?

Pour arrêter le processus (arrêt du StreamingContext), entrez d’abord dans la fenêtre pyspark :

ssc.stop(False,True)

Question

Consultez la documentation de StreamingContext. À quoi correspondent les deux paramètres False et True de la commande précédente ?

Pour relancer tout le processus, il est nécessaire de faire d’abord un peu de nettoyage, notamment de vider les répertoires contenant les résultats temporaires :

os.system("rm -r tpflux/stream tpflux/resultats tpflux/points")
os.system("mkdir -p tpflux/stream tpflux/resultats tpflux/points")

Il est ensuite nécessaire de recréer un StreamingContext et un modèle. Pour ce faire, remontez jusqu’au paragraphe débutant par ssc = ... et relancez les paragraphes précédents dans le même ordre qu’au début.

Par défaut, StreamingKMeans initialise les centroïdes des groupes en les tirant au hasard dans une distribution suivant la loi normale. Il est possible de modifier ce comportement en spécifiant explicitement les centres à utiliser.

Par exemple, nous pouvons plutôt choisir des centres initiaux aléatoirement tirés dans la première tranche du jeu de données en remplaçant la ligne model.setRandomCenters(...) par :

# Centres aléatoires provenant de la première tranche du jeu de données
import numpy as np
randomCenters = np.array([ \
    Vectors.dense(1.9694960109392348,-0.4935929095164415,-2.4170814972553543), \
    Vectors.dense(-1.8664826111202575,-3.496003219019423,0.6136412963273251), \
    Vectors.dense(-0.15369759819355844,-1.1054851590607655,-1.1214291749609204), \
    Vectors.dense(1.6853803159957776,-1.5351888498944155,-0.533050844909388), \
    Vectors.dense(-2.2844115879924636,-1.6973609655616806,-2.9603827271687932)])
model.setInitialCenters(randomCenters, np.ones(5))

Il est possible de répéter l’apprentissage du k-means avec ces centroïdes initiaux.

Si l’on a déjà une idée de la position des centres, nous pouvons bien entendu utiliser ceux-ci. Dans notre cas, nous connaissons exactement la position des centres réels car nous avons nous-mêmes généré les données. Vous pouvez réaliser le k-means en flux en utilisant les bons centres initiaux pour obtenir un partitionnement « parfait ».

Question

À partir de la visualisation du nuage de points, déterminez visuellement des centres approchés raisonnables. Utilisez ces valeurs pour l’initialisation. Le streaming KMeans converge-t-il plus rapidement ?

Question :

Quelle est l’utilité de setDecayFactor ? Modifiez sa valeur et regardez l’impact sur le résultat.