Travaux pratiques - Lecture, écriture et manipulation de DataFrames. Exécution d’applications¶
(une variante de ce TP utilisant le langage Scala)
Références externes utiles :
L’objectif de cette séance de TP est d’introduire les structures de données locales et distribuées employées pour des données numériques, ainsi que la méthode permettant d’écrire des programmes pour Spark en langage Python et de les lancer en exécution.
Les exemples ci-dessous reprennent, en partie, ceux des documentations en ligne.
Nous vous recommandons d’effectuer cette séance de travaux pratiques, ainsi que les suivantes, avec Jupyter afin de vous familiariser avec l’utilisation de Spark dans cet environnement. Pour ce faire, téléchargez le cahier Jupyter en cliquant sur le bandeau en haut du TP (clic droit > Enregistrer la cible du lien sous) puis importez ce fichier (bouton Téléverser) dans Jupyter.
Si vous êtes inscrit au Cnam, vous avez accès au serveur JupyterHub en passant par votre espace numérique de formation. Une fois sur la page du cours, cliquez sur le lien « Accès à JupyterHub ». Vous serez automatiquement authentifié sur votre espace Jupyter personnel hébergé sur les serveurs du Cnam. Votre répertoire personnel est préservé tant que vous êtes inscrit à au moins un cours du Cnam.
Une fois sur cette interface, vous pouvez y importer la version Jupyter du TP que vous aurez préalablement téléchargée en cliquant sur le bandeau en haut de cette page. Lorsque cela vous est demandé, choisissez le noyau « Python » (ou Python3).
Si votre inscription n’a pas encore été validée (ou si JupyterHub ne répond pas) vous pouvez utiliser le serveur Jupyter local sur un ordinateur de la salle de TP. Pour cela, il suffit de le lancer avec jupyter notebook
, dans le répertoire dans lequel vous souhaitez travailler. Une fois démarré Jupyter, vous pouvez y accéder à l’URL http://localhost:8888/.
Il est possible d’installer Spark et Jupyter sur votre machine personnelle en suivant ces instructions d”installation. Vous pouvez nous signaler les éventuelles difficultés rencontrées (après avoir quand même cherché vous-même des solutions dans des forums sur le web). Une fois installé, vous pouvez démarrer Jupyter puis y accéder à l’URL http://localhost:8888/.
Lecture, écriture et manipulation de DataFrame¶
Spark est normalement utilisé sur un cluster composé d’un certain nombre de nœuds de calcul. Les données, très volumineuses, sont stockées dans des fichiers distribués en utilisant le système de fichiers HDFS. Lorsqu’un fichier distribué est lu dans un DataFrame, chaque nœud de calcul lit les données correspondantes, stockées localement dans le fichier distribué désigné comme source, et les transfère dans la partie de DataFrame qui se trouvera dans sa mémoire vive. Quand un DataFrame est écrit sur le stockage de masse (disque), chaque nœud de calcul écrit les données de sa partie du DataFrame (qui se trouvent dans sa mémoire vive) sur le disque local, dans le fichier distribué désigné comme destination. Pour indiquer que le fichier source ou destination est un fichier HDFS, on ajoute au chemin utilisé le préfixe hdfs://
.
Dans le cadre des TP nous travaillons sur des ordinateurs individuels et des fichiers peu volumineux, le système de fichiers utilisé est le système de fichier local (non distribué). En général, en l’absence de préfixe devant le chemin vers le fichier, c’est le système de fichiers local qui est employé. Le système de fichiers local peut aussi être désigné explicitement avec le préfixe file://
.
Lecture directe ou construction de DataFrame¶
La création de DataFrame à partir de fichiers contenant des données numériques peut être faite suivant plusieurs approches. Pour pouvoir examiner ces différentes approches nous chercherons d’abord quelques fichiers de données.
Sous Linux/MacOS, ouvrez une fenêtre terminal et entrez les commandes suivantes :
# À entrer dans un terminal mkdir -p tpnum/data # créer dossier tpnum et sous-dossier data # Téléchargements des fichiers wget -nc http://cedric.cnam.fr/vertigo/Cours/RCP216/docs/geysers.txt -P tpnum/data/ wget -nc http://cedric.cnam.fr/vertigo/Cours/RCP216/docs/geyser.txt -P tpnum/data/ wget -nc http://cedric.cnam.fr/vertigo/Cours/RCP216/docs/geyser.csv -P tpnum/data/ wget -nc https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_libsvm_data.txt -P tpnum/data/
Sous Windows, créez un répertoire tpnum\data
puis téléchargez-y les fichiers dont les URL sont spécifiées ci-dessus.
A l’aide d’un éditeur de texte ou plus simplement de la commande linux head
, examinez le contenu de chacun de ces fichiers.
Lecture de fichier CSV¶
La première approche consiste à créer directement le Dataset / DataFrame à partir d’un fichier. Considérons d’abord cette approche pour un fichier de format CSV (comma separated values), geyser.csv
, qui présente deux valeurs double
par ligne, séparées par une virgule :
# Création directe de DataFrame à partir d'un fichier de données en format CSV
geysercsvdf = spark.read.load("tpnum/data/geyser.csv", format="csv")
# examinons le DataFrame geysercsvdf obtenu
geysercsvdf.printSchema()
geysercsvdf.show(5)
Une solution équivalente pour les fichiers CSV :
geysercsvdf = spark.read.csv("tpnum/data/geyser.csv")
geysercsvdf.show(5)
ou encore
geysercsvdf = spark.read.format("csv").load("tpnum/data/geyser.csv")
geysercsvdf.show(5)
Question :
On constate que les noms obtenus pour les colonnes sont génériques (_c0, _c1
), alors que les vrais noms des colonnes se retrouvent dans la première ligne de données. Pourquoi ? Avec les options de la fonction read
, indiquez lors de la lecture via Spark que l’en-tête du fichier CSV doit être pris en compte pour nommer les colonnes du DataFrame.
Indice : se référer aux exemples de la documentation ou, pour les fichiers CSV en particulier.
Question :
Toujours à l’aide des options, créez un DataFrame à partir du fichier « tpnum/data/geyser.txt » dans lequel les séparateurs sont des espaces (« »).
Par défaut, les colonnes lues sont de type chaîne de caractères (string
), comme on peut le constater avec :
geysercsvdf.dtypes
Le type de chaque colonne peut être inféré à partir de son contenu, au prix d’une passe supplémentaire sur les données lues (ce qui est coûteux lorsque le volume de données est élevé). Il est nécessaire de le demander avec .option("inferSchema", True)
:
geysercsvdf = spark.read.option("header", True).option("inferSchema", True).csv("tpnum/data/geyser.csv")
geysercsvdf.dtypes
testdf = spark.read.option("delimiter", " ").option("inferSchema", True).csv("tpnum/data/geyser.txt")
testdf.dtypes
La méthode .describe()
permet de produire des statistiques sur les colonnes numériques du DataFrame :
geysercsvdf.describe().show()
testdf.describe().show()
Noter toutefois qu’il est possible d’appliquer .describe()
à une colonne de type chaîne de caractères (string
) ; si la plupart des chaînes de caractères peuvent être converties en valeur numériques, .describe()
retourne des résultats cohérents :
geysercsvdf = spark.read.csv("tpnum/data/geyser.csv")
geysercsvdf.dtypes
geysercsvdf.show(5)
geysercsvdf.describe().show()
Lecture de vecteurs étiquetés (labeled points)¶
Un deuxième exemple suivant cette même approche concerne la lecture de vecteurs étiquettés (labeled points) dans un format popularisé par LIBSVM (le même format est utilisé par LIBLINEAR). C’est un format textuel (chaînes de caractères) dans lequel chaque ligne représente un vecteur étiquetté creux dans le format suivant label index1:value1 index2:value2 ...
, où les indices (>= 1) sont en ordre croissant. Après chargement, les indices sont convertis pour être >= 0. SparkSession.read.format("libsvm").load()
permet la lecture de fichiers suivant ce format et la création d’un DataFrame contenant ces données :
# Création directe de DataFrame à partir d'un fichier de données en format LIBSVM
libsvmdf = spark.read.format("libsvm").load("tpnum/data/sample_libsvm_data.txt")
# examinons le DataFrame libsvmdf obtenu
libsvmdf.printSchema()
libsvmdf.show(5)
# Calcul de statistiques pour la colonne « label »
libsvmdf.describe("label").show()
Lorsque la dimension des vecteurs est connue, il est préférable de la préciser. Sinon la lecture est faite en deux passes et prend plus de temps.
# Création DataFrame à partir d'un fichier en format LIBSVM, en précisant numFeatures
libsvmdf = spark.read.format("libsvm").option("numFeatures","692").load("tpnum/data/sample_libsvm_data.txt")
Question :
Pourquoi il n’est pas possible d’obtenir avec .describe()
de statistiques pour la colonne « features » ?
Il faut noter que parmi les autres formats directement accessibles avec SparkSession.read.format()
on peut trouver json
, text
, parquet
(format binaire compact) ou csv
.
Création de DataFrame à partir de RDD¶
Une seconde approche pour la création de DataFrame à partir de fichiers contenant des données numériques consiste à passer par l’intermédiaire d’un RDD. D’abord, un RDD est obtenu à partir du fichier texte geysers.txt
représentant, dans un format adapté à la fonction de lecture .loadVectors()
(vous pouvez voir ce format en examinant avec un éditeur de texte le contenu du fichier geysers.txt
), 272 observations (1 observation par ligne) pour 2 variables (chaque variable est représentée par une colonne) :
from pyspark.mllib.util import MLUtils
sc = spark.sparkContext
lignes = MLUtils.loadVectors(sc, "tpnum/data/geysers.txt")
lignes.take(5)
lignes.count()
Un DataFrame est ensuite obtenu à partir de ce RDD :
from pyspark.sql.types import DoubleType, StructType, StructField
# Définition du schéma du DataFrame comme une chaîne de caractères
chaineSchema = "duration interval"
# Construction du schéma à partir de la chaîne de caractères
champs = [StructField(field_name, DoubleType(), True) for field_name in chaineSchema.split()]
schema = StructType(champs)
# Construction d'un RDD de tuples à partir du RDD[Vector]
lignesTuples = lignes.map(lambda p: (float(p[0]), float(p[1])))
lignesTuples.take(5)
# Construction du DataFrame à partir du RDD de tuples
lignesdf = spark.createDataFrame(lignesTuples, schema)
# Examen du DataFrame obtenu
lignesdf.printSchema()
lignesdf.show(5)
# Calcul de statistiques pour les deux colonnes du DataFrame
lignesdf.describe().show()
Pour obtenir un DataFrame (le même) via un RDD à partir du fichier geyser.txt
dont le format n’est pas adapté à la fonction de lecture loadVectors
nous procédons de la manière suivante :
donnees = sc.textFile("tpnum/data/geyser.txt")
donnees.take(5)
# Découpage de chaque ligne (séparateur espace ici), conversion de chaque partie, construction tuple
lignes2 = donnees.map(lambda l: l.split(" ")).map(lambda p: (float(p[0]), float(p[1].strip())))
# on devrait construire d'abord un RDD de tuples à partir de lignes2, or lignes2 est déjà un RDD de tuples
lignesdf = spark.createDataFrame(lignes2, schema)
lignesdf.printSchema()
lignesdf.show(5)
Question :
Expliquer les opérations réalisées pour obtenir le RDD lignes2
.
Ecriture de DataFrame¶
Un DataFrame peut être écrit sur disque dans un fichier CSV en utilisant :
lignesdf.write.csv("tpnum/data/lignesdffichier", header=True)
Dans cet exemple, un répertoire lignesdffichier
est créé et, dans ce répertoire, chaque partition du DataFrame lignesdf
est stockée dans un fichier (local) différent part-00xxx
en format CSV. On trouve également dans le répertoire des fichiers CRC correspondants, ainsi qu’un fichier _SUCCESS
si l’enregistrement a abouti.
Pour obtenir un seul fichier dans le système de fichiers local, il faut demander que le DataFrame soit repartitionné en une seule partition avec .repartition(1)
ou .coalesce(1)
:
lignesdf.repartition(1).write.csv("tpnum/data/lignesDFfichier", header=True)
Vous pouvez constater que, par défaut, un ancien fichier (ou répertoire) n’est pas écrasé par un nouveau, il faut donc supprimer d’abord le répertoire lignesDFfichier
ou alors utiliser dans l’instruction d’écriture un nom différent (par ex. lignesDFfichier2
).
Des formes équivalentes de l’instruction d’écriture :
lignesdf.repartition(1).write \
.save("tpnum/data/lignesDFfichier3", header=True, format="csv")
lignesdf.repartition(1).write.format("com.databricks.spark.csv") \
.option("header",True).save("tpnum/data/lignesDFfichier4")
Pour écrire en écrasant un fichier/répertoire de même nom il est nécessaire d’utiliser le mode overwrite
(possible aussi avec les formes équivalentes ci-dessus) :
lignesdf.repartition(1).write.mode("overwrite") \
.csv("tpnum/data/lignesDFfichier", header=True)
Sélection et manipulation de colonnes de DataFrame¶
Pour lister les noms des colonnes d’un DataFrame nous pouvons utiliser :
lignesdf.columns # affiche la liste des colonnes
lignesdf.dtypes # affiche la liste des colonnes avec, pour chacune, son type
Pour sélectionner une ou plusieurs colonnes nous pouvons procéder de plusieurs façons, comme on peut le voir dans les exemples suivants :
lignesdf.select("duration").show(5)
lignesdf.select(lignesdf.duration).show(5)
lignesdf.select("interval","duration").show(5)
colonnes = ["interval","duration","interval"] # liste de colonnes, avec doublon
lignesdf.select(*colonnes).show(5)
from pyspark.sql.functions import col
lignesdf.select(col("interval"),col("duration")).show(5)
lignesdf.select(lignesdf.columns[1]).show(5) # colonne 1 (la deuxième colonne)
lignesdf.select(lignesdf.columns[:1]).show(5) # colonnes qui précèdent la colonne 1
Pour ajouter une nouvelle colonne obtenue par calcul à partir de colonnes existantes :
lignesdfDureePlus = lignesdf.withColumn("dureePlus",col("duration")*1.1)
lignesdfDureePlus.show(5)
Pour renommer une colonne :
lignesdfColRenommee = lignesdf.withColumnRenamed("interval","int.")
lignesdfColRenommee.show(5)
Pour supprimer une colonne :
lignesSansDureeDF = lignesdf.drop("duration")
lignesSansDureeDF.show(5)
Écrire et exécuter une application dans Spark¶
Jusqu’ici nous avons utilisé Spark en transmettant une par une des instructions écrites en Python à travers pyspark
ou Jupyter. Ce mode de fonctionnement est adapté à l’exécution d’opérations simple et lors de l’étape de mise au point de programmes. Il est toutefois nécessaire de pouvoir écrire des programmes pour Spark (en Python, dans le cadre de nos TP), de les corriger ou modifier éventuellement et de les lancer en exécution.
Écriture de l’application¶
Nous ferons d’abord une copie locale d’un fichier HTML qui servira de source de données et préparerons la structure des répertoires nécessaires. Pour cela, dans une nouvelle fenêtre terminal entrez les commandes suivantes :
wget -nc http://cedric.cnam.fr/vertigo/Cours/RCP216/tpSparkPython.html -P tpnum/data
Sous Windows, créez le répertoire data
dans le répertoire tpnum
s’il n’est pas déjà créé et téléchargez-y la page HTML spécifiée (par exemple en la sauvegardant depuis un navigateur).
Lancez un éditeur de texte (KWrite
par exemple, dans le menu Applications_pedagogiques
, sous-menu Editeurs
sur les ordinateurs de la salle de TP) et copiez dans cet éditeur le programme Python suivant :
"""SimpleProg.py"""
from pyspark.sql import SparkSession
myFile = "data/tpSparkPython.html"
spark = SparkSession.builder.appName("SimpleProg").getOrCreate()
myData = spark.read.text(myFile).cache()
nbsql = myData.filter(myData.value.contains("sql")).count()
nbspark = myData.filter(myData.value.contains("spark")).count()
print("Lignes avec spark : %i, lignes avec sql : %i" % (nbspark, nbsql))
spark.stop()
Enregistrez-le dans un fichier SimpleProg.py
dans le répertoire tpnum
que vous avez créé plus haut.
Ce programme très simple compte le nombre de lignes du fichier tpSparkPython.html
qui contiennent « spark » et le nombre de lignes qui contiennent « sql ». Lors de l’utilisation de pyspark
un objet SparkSession
était créé automatiquement. Lorsqu’on écrit une application autonome il est nécessaire d’initialiser un objet SparkSession
explicitement. Pour cela, il est nécessaire d’appeler SparkSession.builder
.
Question :
Quel est l’intérêt de l’utilisation de .cache()
pour le DataFrame myData
?
Lancement en exécution de l’application¶
Il est possible de lancer en exécution l’application avec spark-submit
, qui permet d’utiliser tous les cluster managers que Spark sait gérer, de façon transparente à l’application. Dans une fenêtre terminal (bash
), dans le répertoire tpnum
, entrez :
spark-submit --master local SimpleProg.py
Si tout se passe bien, vous devriez obtenir la réponse suivante (certes, noyée entre des lignes INFO) :
...
Lignes avec spark : 27, lignes avec sql : 7
...
L’option transmise à spark-submit
est :
--master
: l’URL dumaster
sur le cluster, icilocal
qui demande l’exécution de l’application en local sur un seul cœur (local[2]
: sur 2 cœurs,local[*]
: sur tous les cœurs disponibles) ; voir ici les différentes options.
L’appel de spark-submit
a en général la forme suivante (pour une application en Python) :
spark-submit \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # autres options
<application.py> \
[arguments de l'application]
avec
--deploy-mode
: s’il faut faire tourner le driver sur un des nœuds du cluster (cluster
) ou localement sur un client externe (client
) (par défautclient
) ;
--conf
: propriétés de configuration de Spark en format « clé=valeur » ;
application.py
: chemin vers un.py
contenant l’application ; l’URL doit être visible à partir de chaque nœud du cluster ;
application-arguments
: si nécessaire, arguments à passer à l’application.
La totalité des options, dont certaines spécifiques au cluster manager employé, peuvent être obtenues avec spark-submit --help
. Aussi, des informations détaillées sur le processus de lancement et sur l’exécution peuvent être obtenues en ajoutant l’option --verbose
lors du lancement de spark-submit
.