Travaux pratiques - Introduction à Spark¶
(une variante de ce TP utilisant le langage Scala)
Références externes utiles :
L’objectif de cette première séance de TP est d’introduire l’interpréteur de commandes de Spark en langage Python, quelques opérations de base sur les structures de données distribuées que sont les DataFrame, ainsi que quelques notions simples et indispensables concernant le langage Python.
Pour les séances de travaux pratiques, Spark est installé sur un système d’exploitation Linux. Si vous n’êtes pas familier avec les commandes Linux, il est utile de consulter au préalable ce site.
Les exemples ci-dessous reprennent, en partie, ceux des documentations en ligne de Spark.
Dans les salles de travaux pratiques (TP), lors du démarrage de l’ordinateur, choisissez la configuration openSUSE
. Après l’ouverture de session, ouvrez une fenêtre de type terminal (par exemple, Konsole ou Qterminal). Dans cette fenêtre vous pouvez entrer des commandes Linux qui sont traitées par un interpréteur de commandes spécifiques (bash
dans le cas particulier des salles de TP, terme générique shell
).
Dans les salles, Spark est installé directement dans Linux (distribution openSUSE
) et, dans la suite, certaines commandes système sont spécifiques à cette configuration. Si vous installez Spark d’une autre façon (machine virtuelle ou machine personnelle), il sera peut-être nécessaire d’adapter les commandes système.
Si vous suivez ce cours en formation à distance ou si vous souhaitez travailler chez vous (par exemple pour réaliser le projet de cette unité d’enseignement), il vous sera nécessaire d’installer Spark sur un ordinateur auquel vous avez accès en permanence. Suivez ces instructions d”installation de Spark et signalez les éventuelles difficultés rencontrées (après avoir cherché vous-même des solutions dans des forums sur le web).
Spark : concepts de base avec exemples¶
Il est possible d’utiliser Spark à partir des langages Java, Scala, Python ou (dans une moindre mesure) R
à travers une interface en ligne de commandes (en Scala ou Python), pratique aussi bien pour des tests interactifs que pour l’étape de mise au point d’une nouvelle application,
en écrivant (et en exécutant ensuite) un programme (voir la séance de TP suivante).
Certaines librairies ayant été développées seulement en Scala, une application écrite en Java ou en Python doit les appeler pour employer leurs fonctionnalités. Dans ces séances de TP nous nous servirons exclusivement de Python.
Lancement de l’interpréteur de commandes en Python et opérations simples¶
Pour ce premier TP, nous utiliserons l’interpréteur de commandes pyspark
. Il s’agit d’une invite de commandes interactive permettant de communiquer directement avec un cluster Spark local.
Nous allons travailler avec un fichier texte contenant la première partie du livre Les Malheurs de Sophie de la comtesse de Ségur, téléchargeable ici. Commençons par ouvrir un terminal (Windows ou MacOS/Linux).
(attention : les commandes qui suivent %%bash
sont à rentrer dans une fenêtre terminal ou invite de commandes)
%%bash
mkdir tpintro # Créé un dossier nommé tpintro
wget -nc https://cedric.cnam.fr/vertigo/Cours/RCP216/sophie.txt -P tpintro/
# wget permet de télécharger un fichier
%%bash
mkdir tpintro # Créé un dossier nommé tpintro
curl https://cedric.cnam.fr/vertigo/Cours/RCP216/sophie.txt -o tpintro/sophie.txt
# curl permet de télécharger un fichier
Téléchargez le fichier sophie.txt
puis déplacez-le dans un répertoire tpintro
. En ligne de commande, déplacez-vous dans ce dossier (cd chemin\vers\tpintro\
).
Lancez ensuite dans cette fenêtre l’interpréteur de commandes pyspark
en entrant
pyspark
Les commandes suivantes précédées par %%bash
sont à entrer dans une fenêtre terminal (invite de commandes), celles non précédées par %%bash
dans l’interpréteur pyspark
. Sous Linux/MacOS, le répertoire courant d’une fenêtre terminal peut être connu avec la commande pwd
. Le répertoire courant pour les commandes qui sont entrées dans pyspark
est celui dans lequel pyspark
a été lancé. Pour le connaître vous pouvez entrer dans pyspark
:
import os
os.system("pwd")
Si un fichier à lire n’est pas dans le répertoire courant, il faut précéder son nom par le chemin relatif par rapport au répertoire courant.
Note
Si vous avez fait une faute de frappe ou de recopie et ne comprenez pas ce qui s’affiche dans la fenêtre, consultez le message d’erreur qui s’affiche afin de comprendre le problème. Les erreurs communes sont des fautes de frappe dans le nom des variables et des erreurs dans le syntaxe de python. En cas de blocage, vous pouvez faire appel à l’enseignant.
Dataset et DataFrame¶
Un Dataset est une collection distribuée de données. Il peut être vu comme une évolution conceptuelle des RDD (Resilient Distributed DataFrames), historiquement la première structure de données distribuée employée par Spark. Un DataFrame est un Dataset organisé en colonnes qui portent des noms, comme les tables d’une base de données. Avec l’interface de programmation en python, le type DataFrame
est simplement l’alias du type Dataset[Row]
.
Il peut être utile de lire ces explications préparées par les créateurs de Spark pour mieux comprendre l’intérêt de chacune des interfaces de programmation (API) RDD et Dataset / DataFrame.
Le lancement de pyspark
crée implicitement l’instance spark
de SparkSession
qui comporte un certain nombre d’informations de configuration (par ex., indique à Spark comment accéder à un cluster pour exécuter les commandes) et qui peut être employée directement. Dans d’autres cas, cette instance doit être créée explicitement avec
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
Créez maintenant un DataFrame à partir du fichier texte sophie.txt
:
texteSophie = spark.read.text("tpintro/sophie.txt")
Dans la configuration actuelle de Spark dans la salle de travaux pratiques, c’est le système de fichiers (filesystem) local et non HDFS (distribué) qui est utilisé par défaut. Si vous entrez simplement .text("tpintro/sophie.txt")
, Spark cherchera le fichier dans le système de fichiers par défaut, en suivant le chemin spécifié. Avec le préfixe file://
on indique que le fichier à lire est sur le système de fichiers local, avec le préfixe hdfs://
on indique qu’il s’agit plutôt de HDFS. L’utilisation d’un de ces préfixes exige l’emploi du chemin absolu vers le fichier correspondant.
Visualisez le fichier sophie.txt
. Pour cela, il faut ouvrir une autre fenêtre terminal. Allez ensuite dans le répertoire où se trouve le fichier sophie.txt
et examinez-le :
%%bash
cd tpintro
cat sophie.txt
%%bash
cd tpintro
type sophie.txt
La commande Linux/MacOS cat
(sous Windows, type
) permet d’afficher le contenu d’un fichier, en l’occurrence ici du texte.
Il est possible d’appliquer aux DataFrames des actions, qui retournent des valeurs, et des transformations, qui modifient les DataFrames ou en produisent de nouveaux.
Quelques actions simples :
texteSophie.count() # retourne le nombre de lignes dans ce DataFrame
texteSophie.first() # retourne les première ligne de ce DataFrame
texteSophie.take(5) # retourne les 5 premières lignes de ce DataFrame
texteSophie.collect() # retourne sur le *driver* le contenu complet de ce DataFrame et en affiche une partie à la console
Avec .collect()
, lorsque le DataFrame en question est distribué, toutes ses données situées sur les nœuds de calcul sont d’abord regroupées sur le nœud driver. Cette opération très coûteuse si le DataFrame est grand, et qui peut engendrer des débordements de la mémoire sur le driver, donc à éviter autant que possible. Ensuite, une petite partie des données est affichée à la console.
Question :
À quoi correspond une ligne de ce DataFrame ?
Correction
Une ligne de ce DataFrame correspond à une ligne du fichier sophie.txt
. On peut aisément le vérifier en affichant les premières lignes du DataFrame :
texteSophie.show(5)
L’exécution d’une action comme count
sur un très grand DataFrame se déroule de la manière suivante : le DataFrame est composé de fragments, chacun stocké sur un nœud de calcul ; chaque fragment contient un (grand) nombre de lignes ; le programme driver de Spark envoie à chaque nœud le traitement à faire ; chaque nœud de calcul exécute le traitement sur le fragment local et transmet les résultats au driver. Pour d’autres actions (comme first
) seul un fragment est nécessaire donc un seul nœud est sollicité.
Quelques transformations simples :
Construire un DataFrame
lignesAvecPoupee
comprenant seulement les lignes detexteSophie
qui contiennent « poupée », retourner un tableau avec ses 2 premières lignes :
lignesAvecPoupee = texteSophie.filter(texteSophie.value.contains("poupée"))
lignesAvecPoupee.show(2) # affiche les 2 premières lignes de ce DataFrame
Construire un DataFrame
longueursLignes
composé des longueurs des lignes detexteSophie
, retourner un tableau avec ses 5 premières lignes :
from pyspark.sql.functions import * # pour utiliser fonctions comme length()
longueursLignes = texteSophie.select(length(texteSophie.value))
longueursLignes.show(5) # affiche les 5 premières lignes de ce DataFrame
Dans ces exemples, l’expression texteSophie.value
retourne le contenu de l’unique colonne de texteSophie
, appelée value
.
L’exécution d’une transformation sur un très grand DataFrame se déroule de la manière suivante : le DataFrame est composé de fragments, chacun stocké sur un nœud de calcul ; le programme driver de Spark envoie à chaque nœud le traitement à faire ; chaque nœud de calcul exécute le traitement sur le fragment local et conserve les résultats localement.
Nous pouvons constater que l’unique colonne de longueursLignes
a un nom complexe. Il peut être utile de renommer une colonne pour la manipuler plus facilement, per exemple :
longueursLignes.select(longueursLignes['length(value)'].alias("lengths")).show(5)
Enchaînements et évaluation « paresseuse »¶
Transformations et actions peuvent s’enchaîner :
Question :
Combien de lignes contiennent poupée
? Écrire la commande nécessaire dans pyspark
et vérifier dans un terminal (bash
) Linux.
Correction
# .filter() permet de ne garder que les lignes vérifiant la condition désirée
# .count() permet de compter le nombre de lignes
texteSophie.filter(texteSophie.value.contains("poupée")).count()
Vous pouvez vérifier ce résultat de différentes façons, par exemple en utilisant un éditeur de texte (KWrite sur les machines de TP) pour compter les lignes. Il est toutefois possible d’obtenir cette valeur directement depuis la ligne de commande :
Dans un terminal :
%%bash
cd tpintro
grep poupée sophie.txt | wc -l
grep poupée sophie.txt
renvoie les lignes du fichier d’entrée (ici sophie.txt
envoyé par la commande précédente) qui contiennent le « motif » (pattern) poupée
. Le caractère |
indique que wc so la sortie de la commande qui précède ne doit pas être envoyée à la console mais transférée à la commande qui suit (c’est un enchaînement de commandes ou pipe Linux). Ces lignes sont donc envoyées vers la commande Linux wc -l
qui affiche le nombre total de lignes du fichier reçu en entrée (ici, les lignes de sophie.txt
contenant poupée
).
%%bash
cd tpintro
findstr poupée sophie.txt | find /c /v ""
findstr string file
recherche la chaîne de caractères string
dans le fichier file
. Le caractère |
indique que la sortie de la commande qui précède ne doit pas être retourné à la console mais envoyé vers la commande qui suit (c’est un enchaînement de commandes ou pipe). find /c /v ""
permet ensuite de compter le nombre de lignes du fichier reçu en entrée (ici, les lignes de sophie.txt
contenant poupée
).
Question :
Combien de caractères contient le fichier ? Utilisez un éditeur de texte ou bien la commande wc -m
pour vérifier le résultat. Attention : la commande wc
considère les sauts de ligne comme un caractère (mais pas Spark).
Correction
texteSophie.select(sum(length(texteSophie.value))).show()
Important : l’évaluation est « paresseuse » : les opérations sont effectuées seulement quand un résultat doit être retourné. Par exemple, dans la séquence
texteSophie = spark.read.text("tpintro/sophie.txt")
longueursLignes = texteSophie.select(length(texteSophie.value))
longueursLignes.select(sum("length(value)")).show()
les données ne sont pas chargées en mémoire après la première ligne et longueursLignes
n’est pas construit immédiatement après la transformation de la seconde ligne. Ce n’est qu’au moment où l’action show
doit être exécutée que Spark partitionne les calculs à faire en tâches pour les différentes machines (et/ou cœurs) et chaque machine (et/ou cœur) exécute sa partie de calcul de longueurs de lignes (pour les lignes stockées localement) et de somme, avant de retourner la réponse au programme driver qui fait la somme finale (et qui contrôle l’exécution).
En conséquence, lorsque pyspark
vous signale une erreur, il est possible que cette erreur ne provienne pas de la dernière instruction que vous avez écrite mais d’une instruction antérieure. Pour cela, lorsque vous mettez en place une chaîne de traitement en vous servant de pyspark
, il est utile d’entrer de temps en temps des actions simples (comme par ex. longueursLignes.take(2)
) pour identifier plus facilement les éventuelles erreurs.
Note
Si vous souhaitez supprimer les nombreuses lignes d’information qui précèdent la réponse (vous pouvez faire cela sur votre installation personnelle de Spark, pas sur celle de la salle de TP), dans le répertoire conf
copiez log4j.properties.template
en log4j.properties
et, dans ce dernier fichier, ligne log4j.rootCategory=INFO
, remplacez INFO
par WARN
. Dans une fenêtre terminal (bash
), à ouvrir séparément de la fenêtre Spark, entrez les commandes suivantes (remplacer /opt/spark
par le répertoire dans lequel spark
est installé sur votre ordinateur personnel) :
cd /opt/spark/conf
sudo cp log4j.properties.template log4j.properties
sudo gedit log4j.properties
Dans gedit
, ligne log4j.rootCategory=INFO
, remplacez INFO
par WARN
. Pour rappel, ci-dessus $
est le prompt système dans la fenêtre bash
.
Persistance¶
Sur le nœud driver comme sur chaque nœud de calcul, Spark s’exécute dans une machine virtuelle Java. Par conséquent, une structure de données de type RDD ou DataFrame peut être supprimée de la mémoire gérée par la machine virtuelle Java si plus de place est nécessaire pour une nouvelle structure de données. La structure de données supprimée sera créée de nouveau (potentiellement recalculée à partir de ce qui est stocké sur le stockage de masse, non volatile) si elle est de nouveau nécessaire dans des calculs. Or, cette opération peut être très coûteuse.
Il est possible d’éviter (dans la mesure du possible) la suppression des structures de données dont nous savons à l’avance qu’elles sont de nouveau nécessaires plus tard en les déclarant comme prioritaires. Pour cela, deux fonctions sont disponibles : .cache()
, qui emploie le niveau de stockage courant (par défaut : en mémoire vive ou sur stockage de masse), et .persist()
, qui permet de préciser le niveau de stockage (en mémoire vive seulement, ou en mémoire vive ou sur stockage de masse, etc.).
Exemple :
texteSophie = spark.read.text("tpintro/sophie.txt")
longueursLignes = texteSophie.select(length(texteSophie.value)).cache()
Utilisation de SQL¶
Il est possible de décrire le schéma d’un DataFrame :
texteSophie.printSchema()
longueursLignes.printSchema()
Dans ce cas, une seule colonne est présente et se nomme « value » (nom par défaut). Cette colonne contient des valeurs de type string (chaînes de caractères, c’est-à-dire du texte).
Pour rendre le DataFrame texteSophie
plus complexe, nous pouvons lui ajouter une colonne avec les longueurs des lignes correspondantes :
texteSophieNouveau = texteSophie.withColumn("lengths", length("value"))
texteSophieNouveau.show(5)
Nous pouvons obtenir un nouveau DataFrame comme résultat d’une requête SQL exécutée sur un premier DataFrame déclaré comme table SQL (vue temporaire) :
# Création d'une vue SQL temporaire "lignes" correspondant au DataFrame texteSophie
texteSophie.createOrReplaceTempView("lignes")
# Application d'une requête SQL sur la vue temporaire
lignesPoupee = spark.sql("SELECT value FROM lignes WHERE value LIKE '%poupée%'")
lignesPoupee.show()
Question :
Que contient lignesPoupee
?
Correction
lignesPoupee
est un DataFrame composé des lignes de texteSophie
qui contiennent poupée
.
Un autre exemple, sur un DataFrame à deux colonnes :
# Création d'une vue SQL temporaire "lignesAvecLongueurs"
texteSophieNouveau.createOrReplaceTempView("lignesAvecLongueurs")
# Application d'une requête SQL sur la vue temporaire
lignesLongues = spark.sql("SELECT value FROM lignesAvecLongueurs WHERE lengths > 20")
lignesLongues.show()
Question :
Que contient lignesLongues
?
Correction
lignesLongues
est un DataFrame composé des lignes de texteSophie
dont la longueur est supérieure à 20 caractères.