Travaux pratiques - Lecture, écriture et manipulation de DataFrames. Exécution d’applications

(une variante de ce TP utilisant le langage Scala)

Références externes utiles :

L’objectif de cette séance de TP est d’introduire les structures de données locales et distribuées employées pour des données numériques, ainsi que la méthode permettant d’écrire des programmes pour Spark en langage Python et de les lancer en exécution.

Les exemples ci-dessous reprennent, en partie, ceux des documentations en ligne.

Nous vous recommandons d’effectuer cette séance de travaux pratiques, ainsi que les suivantes, avec Jupyter afin de vous familiariser avec l’utilisation de Spark dans cet environnement. Pour ce faire, téléchargez le cahier Jupyter en cliquant sur le bandeau en haut du TP (clic droit > Enregistrer la cible du lien sous) puis importez ce fichier (bouton Téléverser) dans Jupyter.

Si vous êtes inscrit au Cnam, vous avez accès au serveur JupyterHub en passant par votre espace numérique de formation. Une fois sur la page du cours, cliquez sur le lien « Accès à JupyterHub ». Vous serez automatiquement authentifié sur votre espace Jupyter personnel hébergé sur les serveurs du Cnam. Votre répertoire personnel est préservé tant que vous êtes inscrit à au moins un cours du Cnam.

Une fois sur cette interface, vous pouvez y importer la version Jupyter du TP que vous aurez préalablement téléchargée en cliquant sur le bandeau en haut de cette page. Lorsque cela vous est demandé, choisissez le noyau « Python » (ou Python3).

Lecture, écriture et manipulation de DataFrame

Spark est normalement utilisé sur un cluster composé d’un certain nombre de nœuds de calcul. Les données, très volumineuses, sont stockées dans des fichiers distribués en utilisant le système de fichiers HDFS. Lorsqu’un fichier distribué est lu dans un DataFrame, chaque nœud de calcul lit les données correspondantes, stockées localement dans le fichier distribué désigné comme source, et les transfère dans la partie de DataFrame qui se trouvera dans sa mémoire vive. Quand un DataFrame est écrit sur le stockage de masse (disque), chaque nœud de calcul écrit les données de sa partie du DataFrame (qui se trouvent dans sa mémoire vive) sur le disque local, dans le fichier distribué désigné comme destination. Pour indiquer que le fichier source ou destination est un fichier HDFS, on ajoute au chemin utilisé le préfixe hdfs://.

Dans le cadre des TP nous travaillons sur des ordinateurs individuels et des fichiers peu volumineux, le système de fichiers utilisé est le système de fichier local (non distribué). En général, en l’absence de préfixe devant le chemin vers le fichier, c’est le système de fichiers local qui est employé. Le système de fichiers local peut aussi être désigné explicitement avec le préfixe file://.

Lecture directe ou construction de DataFrame

La création de DataFrame à partir de fichiers contenant des données numériques peut être faite suivant plusieurs approches. Pour pouvoir examiner ces différentes approches nous chercherons d’abord quelques fichiers de données.

Sous Linux/MacOS, ouvrez une fenêtre terminal et entrez les commandes suivantes :

# À entrer dans un terminal
mkdir -p tpnum/data  # créer dossier tpnum et sous-dossier data
# Téléchargements des fichiers
wget -nc http://cedric.cnam.fr/vertigo/Cours/RCP216/docs/geysers.txt -P tpnum/data/
wget -nc http://cedric.cnam.fr/vertigo/Cours/RCP216/docs/geyser.txt -P tpnum/data/
wget -nc http://cedric.cnam.fr/vertigo/Cours/RCP216/docs/geyser.csv -P tpnum/data/
wget -nc https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_libsvm_data.txt -P tpnum/data/

Sous Windows, créez un répertoire tpnum\data puis téléchargez-y les fichiers dont les URL sont spécifiées ci-dessus.

A l’aide d’un éditeur de texte ou plus simplement de la commande linux head, examinez le contenu de chacun de ces fichiers.

Lecture de fichier CSV

La première approche consiste à créer directement le Dataset / DataFrame à partir d’un fichier. Considérons d’abord cette approche pour un fichier de format CSV (comma separated values), geyser.csv, qui présente deux valeurs double par ligne, séparées par une virgule :

# Création directe de DataFrame à partir d'un fichier de données en format CSV
geysercsvdf = spark.read.load("tpnum/data/geyser.csv", format="csv")

# examinons le DataFrame geysercsvdf obtenu
geysercsvdf.printSchema()
geysercsvdf.show(5)

Une solution équivalente pour les fichiers CSV :

geysercsvdf = spark.read.csv("tpnum/data/geyser.csv")
geysercsvdf.show(5)

ou encore

geysercsvdf = spark.read.format("csv").load("tpnum/data/geyser.csv")
geysercsvdf.show(5)

Question :

On constate que les noms obtenus pour les colonnes sont génériques (_c0, _c1), alors que les vrais noms des colonnes se retrouvent dans la première ligne de données. Pourquoi ? Avec les options de la fonction read, indiquez lors de la lecture via Spark que l’en-tête du fichier CSV doit être pris en compte pour nommer les colonnes du DataFrame.

Indice : se référer aux exemples de la documentation ou, pour les fichiers CSV en particulier.

Correction

Il suffit de spécifier cela comme une option en ajoutant header="true" aux arguments de l’ordre de lecture (par défaut c’est header="false", il n’y a pas d’en-tête dans le fichier CSV) :

geysercsvdf = spark.read.load("tpnum/data/geyser.csv", format="csv", header="true")
geysercsvdf.show(5)

Une solution équivalente :

geysercsvdf = spark.read.option("header", True).csv("tpnum/data/geyser.csv")
geysercsvdf.show(5)

Question :

Toujours à l’aide des options, créez un DataFrame à partir du fichier « tpnum/data/geyser.txt » dans lequel les séparateurs sont des espaces (« »).

Correction

L’argument correspondant est sep=" " dans les arguments de l’ordre de lecture (par défaut c’est sep=",", séparateur virgule) :

testdf = spark.read.load("tpnum/data/geyser.txt", format="csv", sep=" ")
testdf.show(5)

Une solution équivalente :

testdf = spark.read.option("delimiter", " ").csv("tpnum/data/geyser.txt")
testdf.show(5)

Par défaut, les colonnes lues sont de type chaîne de caractères (string), comme on peut le constater avec :

geysercsvdf.dtypes

Le type de chaque colonne peut être inféré à partir de son contenu, au prix d’une passe supplémentaire sur les données lues (ce qui est coûteux lorsque le volume de données est élevé). Il est nécessaire de le demander avec .option("inferSchema", True) :

geysercsvdf = spark.read.option("header", True).option("inferSchema", True).csv("tpnum/data/geyser.csv")
geysercsvdf.dtypes
testdf = spark.read.option("delimiter", " ").option("inferSchema", True).csv("tpnum/data/geyser.txt")
testdf.dtypes

La méthode .describe() permet de produire des statistiques sur les colonnes numériques du DataFrame :

geysercsvdf.describe().show()
testdf.describe().show()

Noter toutefois qu’il est possible d’appliquer .describe() à une colonne de type chaîne de caractères (string) ; si la plupart des chaînes de caractères peuvent être converties en valeur numériques, .describe() retourne des résultats cohérents :

geysercsvdf = spark.read.csv("tpnum/data/geyser.csv")
geysercsvdf.dtypes
geysercsvdf.show(5)
geysercsvdf.describe().show()

Lecture de vecteurs étiquetés (labeled points)

Un deuxième exemple suivant cette même approche concerne la lecture de vecteurs étiquettés (labeled points) dans un format popularisé par LIBSVM (le même format est utilisé par LIBLINEAR). C’est un format textuel (chaînes de caractères) dans lequel chaque ligne représente un vecteur étiquetté creux dans le format suivant label index1:value1 index2:value2 ..., où les indices (>= 1) sont en ordre croissant. Après chargement, les indices sont convertis pour être >= 0. SparkSession.read.format("libsvm").load() permet la lecture de fichiers suivant ce format et la création d’un DataFrame contenant ces données :

# Création directe de DataFrame à partir d'un fichier de données en format LIBSVM
libsvmdf = spark.read.format("libsvm").load("tpnum/data/sample_libsvm_data.txt")

# examinons le DataFrame libsvmdf obtenu
libsvmdf.printSchema()
libsvmdf.show(5)

# Calcul de statistiques pour la colonne « label »
libsvmdf.describe("label").show()

Lorsque la dimension des vecteurs est connue, il est préférable de la préciser. Sinon la lecture est faite en deux passes et prend plus de temps.

# Création DataFrame à partir d'un fichier en format LIBSVM, en précisant numFeatures
libsvmdf = spark.read.format("libsvm").option("numFeatures","692").load("tpnum/data/sample_libsvm_data.txt")

Question :

Pourquoi il n’est pas possible d’obtenir avec .describe() de statistiques pour la colonne « features » ?

Correction

La colonne « features » n’est pas une colone numérique scalaire mais vectorielle.

Il faut noter que parmi les autres formats directement accessibles avec SparkSession.read.format() on peut trouver json, text, parquet (format binaire compact) ou csv.

Création de DataFrame à partir de RDD

Une seconde approche pour la création de DataFrame à partir de fichiers contenant des données numériques consiste à passer par l’intermédiaire d’un RDD. D’abord, un RDD est obtenu à partir du fichier texte geysers.txt représentant, dans un format adapté à la fonction de lecture .loadVectors() (vous pouvez voir ce format en examinant avec un éditeur de texte le contenu du fichier geysers.txt), 272 observations (1 observation par ligne) pour 2 variables (chaque variable est représentée par une colonne) :

from pyspark.mllib.util import MLUtils

sc = spark.sparkContext

lignes = MLUtils.loadVectors(sc, "tpnum/data/geysers.txt")
lignes.take(5)
lignes.count()

Un DataFrame est ensuite obtenu à partir de ce RDD :

from pyspark.sql.types import DoubleType, StructType, StructField

# Définition du schéma du DataFrame comme une chaîne de caractères
chaineSchema = "duration interval"

# Construction du schéma à partir de la chaîne de caractères
champs = [StructField(field_name, DoubleType(), True) for field_name in chaineSchema.split()]
schema = StructType(champs)

# Construction d'un RDD de tuples à partir du RDD[Vector]
lignesTuples = lignes.map(lambda p: (float(p[0]), float(p[1])))
lignesTuples.take(5)

# Construction du DataFrame à partir du RDD de tuples
lignesdf = spark.createDataFrame(lignesTuples, schema)

# Examen du DataFrame obtenu
lignesdf.printSchema()
lignesdf.show(5)

# Calcul de statistiques pour les deux colonnes du DataFrame
lignesdf.describe().show()

Pour obtenir un DataFrame (le même) via un RDD à partir du fichier geyser.txt dont le format n’est pas adapté à la fonction de lecture loadVectors nous procédons de la manière suivante :

donnees = sc.textFile("tpnum/data/geyser.txt")
donnees.take(5)

# Découpage de chaque ligne (séparateur espace ici), conversion de chaque partie, construction tuple
lignes2 = donnees.map(lambda l: l.split(" ")).map(lambda p: (float(p[0]), float(p[1].strip())))

# on devrait construire d'abord un RDD de tuples à partir de lignes2, or lignes2 est déjà un RDD de tuples
lignesdf = spark.createDataFrame(lignes2, schema)

lignesdf.printSchema()
lignesdf.show(5)

Question :

Expliquer les opérations réalisées pour obtenir le RDD lignes2.

Correction

Le fichier geyser.txt est lu dans un RDD donnees de type texte, dont chaque élément (item) est une ligne de texte. Vous pouvez visualiser le contenu du fichier geyser.txt en entrant, dans une fenêtre terminal, more /home/cloudera/tpnum/data/geyser.txt. Ensuite, lors de la création du RDD lignes2, les opérations suivantes ont lieu : d’abord (premier .map), chaque ligne de texte (chaque item de donnees), désignée implicitement par l, qui est une chaîne de caractères, est partitionnée en « mots » séparés par des espaces avec l.split(' ') pour produire un « tableau » (non explicitement indiqué) de « mots » (chaînes de caractères). Ensuite (second .map), chaque élément de ce tableau (chaque « mot »), correspondant à une séquence de caractères qui représente une valeur numérique (visualisez le fichier geyser.txt), est converti en cette valeur numérique en double précision par float() (le rôle de .strip() est de supprimer les caractères assimilés espace à la fin) et un tuple est construit avec les mots de chaque ligne, tuple qui sera un item du RDD lignes2.

Ecriture de DataFrame

Un DataFrame peut être écrit sur disque dans un fichier CSV en utilisant :

lignesdf.write.csv("tpnum/data/lignesdffichier", header=True)

Dans cet exemple, un répertoire lignesdffichier est créé et, dans ce répertoire, chaque partition du DataFrame lignesdf est stockée dans un fichier (local) différent part-00xxx en format CSV. On trouve également dans le répertoire des fichiers CRC correspondants, ainsi qu’un fichier _SUCCESS si l’enregistrement a abouti.

Pour obtenir un seul fichier dans le système de fichiers local, il faut demander que le DataFrame soit repartitionné en une seule partition avec .repartition(1) ou .coalesce(1) :

lignesdf.repartition(1).write.csv("tpnum/data/lignesDFfichier", header=True)

Vous pouvez constater que, par défaut, un ancien fichier (ou répertoire) n’est pas écrasé par un nouveau, il faut donc supprimer d’abord le répertoire lignesDFfichier ou alors utiliser dans l’instruction d’écriture un nom différent (par ex. lignesDFfichier2).

Des formes équivalentes de l’instruction d’écriture :

lignesdf.repartition(1).write \
        .save("tpnum/data/lignesDFfichier3", header=True, format="csv")
lignesdf.repartition(1).write.format("com.databricks.spark.csv") \
        .option("header",True).save("tpnum/data/lignesDFfichier4")

Pour écrire en écrasant un fichier/répertoire de même nom il est nécessaire d’utiliser le mode overwrite (possible aussi avec les formes équivalentes ci-dessus) :

lignesdf.repartition(1).write.mode("overwrite") \
        .csv("tpnum/data/lignesDFfichier", header=True)

Sélection et manipulation de colonnes de DataFrame

Pour lister les noms des colonnes d’un DataFrame nous pouvons utiliser :

lignesdf.columns   # affiche la liste des colonnes
lignesdf.dtypes    # affiche la liste des colonnes avec, pour chacune, son type

Pour sélectionner une ou plusieurs colonnes nous pouvons procéder de plusieurs façons, comme on peut le voir dans les exemples suivants :

lignesdf.select("duration").show(5)
lignesdf.select(lignesdf.duration).show(5)
lignesdf.select("interval","duration").show(5)
colonnes = ["interval","duration","interval"]    # liste de colonnes, avec doublon
lignesdf.select(*colonnes).show(5)
from pyspark.sql.functions import col
lignesdf.select(col("interval"),col("duration")).show(5)
lignesdf.select(lignesdf.columns[1]).show(5)    # colonne 1 (la deuxième colonne)
lignesdf.select(lignesdf.columns[:1]).show(5)   # colonnes qui précèdent la colonne 1

Pour ajouter une nouvelle colonne obtenue par calcul à partir de colonnes existantes :

lignesdfDureePlus = lignesdf.withColumn("dureePlus",col("duration")*1.1)
lignesdfDureePlus.show(5)

Pour renommer une colonne :

lignesdfColRenommee = lignesdf.withColumnRenamed("interval","int.")
lignesdfColRenommee.show(5)

Pour supprimer une colonne :

lignesSansDureeDF = lignesdf.drop("duration")
lignesSansDureeDF.show(5)

Écrire et exécuter une application dans Spark

Jusqu’ici nous avons utilisé Spark en transmettant une par une des instructions écrites en Python à travers pyspark ou Jupyter. Ce mode de fonctionnement est adapté à l’exécution d’opérations simple et lors de l’étape de mise au point de programmes. Il est toutefois nécessaire de pouvoir écrire des programmes pour Spark (en Python, dans le cadre de nos TP), de les corriger ou modifier éventuellement et de les lancer en exécution.

Écriture de l’application

Nous ferons d’abord une copie locale d’un fichier HTML qui servira de source de données et préparerons la structure des répertoires nécessaires. Pour cela, dans une nouvelle fenêtre terminal entrez les commandes suivantes :

wget -nc http://cedric.cnam.fr/vertigo/Cours/RCP216/tpSparkPython.html -P tpnum/data

Sous Windows, créez le répertoire data dans le répertoire tpnum s’il n’est pas déjà créé et téléchargez-y la page HTML spécifiée (par exemple en la sauvegardant depuis un navigateur).

Lancez un éditeur de texte (KWrite par exemple, dans le menu Applications_pedagogiques, sous-menu Editeurs sur les ordinateurs de la salle de TP) et copiez dans cet éditeur le programme Python suivant :

"""SimpleProg.py"""
from pyspark.sql import SparkSession

myFile = "data/tpSparkPython.html"
spark = SparkSession.builder.appName("SimpleProg").getOrCreate()
myData = spark.read.text(myFile).cache()

nbsql = myData.filter(myData.value.contains("sql")).count()
nbspark = myData.filter(myData.value.contains("spark")).count()

print("Lignes avec spark : %i, lignes avec sql : %i" % (nbspark, nbsql))

spark.stop()

Enregistrez-le dans un fichier SimpleProg.py dans le répertoire tpnum que vous avez créé plus haut.

Ce programme très simple compte le nombre de lignes du fichier tpSparkPython.html qui contiennent « spark » et le nombre de lignes qui contiennent « sql ». Lors de l’utilisation de pyspark un objet SparkSession était créé automatiquement. Lorsqu’on écrit une application autonome il est nécessaire d’initialiser un objet SparkSession explicitement. Pour cela, il est nécessaire d’appeler SparkSession.builder.

Question :

Quel est l’intérêt de l’utilisation de .cache() pour le DataFrame myData ?

Correction

La méthode .cache() permet de conserver le DataFrame myData en mémoire et donc de ne pas avoir à répéter sa lecture plusieurs fois.

Lancement en exécution de l’application

Il est possible de lancer en exécution l’application avec spark-submit, qui permet d’utiliser tous les cluster managers que Spark sait gérer, de façon transparente à l’application. Dans une fenêtre terminal (bash), dans le répertoire tpnum, entrez :

spark-submit --master local SimpleProg.py

Si tout se passe bien, vous devriez obtenir la réponse suivante (certes, noyée entre des lignes INFO) :

...
Lignes avec spark : 27, lignes avec sql : 7
...

L’option transmise à spark-submit est :

  • --master : l’URL du master sur le cluster, ici local qui demande l’exécution de l’application en local sur un seul cœur (local[2] : sur 2 cœurs, local[*]: sur tous les cœurs disponibles) ; voir ici les différentes options.

L’appel de spark-submit a en général la forme suivante (pour une application en Python) :

spark-submit \
    --master <master-url> \
    --deploy-mode <deploy-mode> \
    --conf <key>=<value> \
    ... # autres options
    <application.py> \
    [arguments de l'application]

avec

  • --deploy-mode : s’il faut faire tourner le driver sur un des nœuds du cluster (cluster) ou localement sur un client externe (client) (par défaut client) ;

  • --conf : propriétés de configuration de Spark en format « clé=valeur » ;

  • application.py : chemin vers un .py contenant l’application ; l’URL doit être visible à partir de chaque nœud du cluster ;

  • application-arguments : si nécessaire, arguments à passer à l’application.

La totalité des options, dont certaines spécifiques au cluster manager employé, peuvent être obtenues avec spark-submit --help. Aussi, des informations détaillées sur le processus de lancement et sur l’exécution peuvent être obtenues en ajoutant l’option --verbose lors du lancement de spark-submit.