Travaux pratiques - Introduction à Spark

(une variante de ce TP utilisant le langage Scala)

Références externes utiles :

L’objectif de cette première séance de TP est d’introduire l’interpréteur de commandes de Spark en langage Python, quelques opérations de base sur les structures de données distribuées que sont les DataFrame, ainsi que quelques notions simples et indispensables concernant le langage Python.

Pour les séances de travaux pratiques, Spark est installé sur un système d’exploitation Linux. Si vous n’êtes pas familier avec les commandes Linux, il est utile de consulter au préalable ce site.

Les exemples ci-dessous reprennent, en partie, ceux des documentations en ligne de Spark.

Dans les salles de travaux pratiques (TP), lors du démarrage de l’ordinateur, choisissez la configuration openSUSE. Après l’ouverture de session, ouvrez une fenêtre de type terminal (par exemple, Konsole ou Qterminal). Dans cette fenêtre vous pouvez entrer des commandes Linux qui sont traitées par un interpréteur de commandes spécifiques (bash dans le cas particulier des salles de TP, terme générique shell).

Dans les salles, Spark est installé directement dans Linux (distribution openSUSE) et, dans la suite, certaines commandes système sont spécifiques à cette configuration. Si vous installez Spark d’une autre façon (machine virtuelle ou machine personnelle), il sera peut-être nécessaire d’adapter les commandes système.

Spark : concepts de base avec exemples

Il est possible d’utiliser Spark à partir des langages Java, Scala, Python ou (dans une moindre mesure) R

  • à travers une interface en ligne de commandes (en Scala ou Python), pratique aussi bien pour des tests interactifs que pour l’étape de mise au point d’une nouvelle application,

  • en écrivant (et en exécutant ensuite) un programme (voir la séance de TP suivante).

Certaines librairies ayant été développées seulement en Scala, une application écrite en Java ou en Python doit les appeler pour employer leurs fonctionnalités. Dans ces séances de TP nous nous servirons exclusivement de Python.

Lancement de l’interpréteur de commandes en Python et opérations simples

Pour ce premier TP, nous utiliserons l’interpréteur de commandes pyspark. Il s’agit d’une invite de commandes interactive permettant de communiquer directement avec un cluster Spark local.

Nous allons travailler avec un fichier texte contenant la première partie du livre Les Malheurs de Sophie de la comtesse de Ségur, téléchargeable ici. Commençons par ouvrir un terminal (Windows ou MacOS/Linux).

(attention : les commandes qui suivent %%bash sont à rentrer dans une fenêtre terminal ou invite de commandes)

%%bash
mkdir tpintro # Créé un dossier nommé tpintro
wget -nc https://cedric.cnam.fr/vertigo/Cours/RCP216/sophie.txt -P tpintro/
# wget permet de télécharger un fichier

Lancez ensuite dans cette fenêtre l’interpréteur de commandes pyspark en entrant

pyspark

Les commandes suivantes précédées par %%bash sont à entrer dans une fenêtre terminal (invite de commandes), celles non précédées par %%bash dans l’interpréteur pyspark. Sous Linux/MacOS, le répertoire courant d’une fenêtre terminal peut être connu avec la commande pwd. Le répertoire courant pour les commandes qui sont entrées dans pyspark est celui dans lequel pyspark a été lancé. Pour le connaître vous pouvez entrer dans pyspark :

import os
os.system("pwd")

Si un fichier à lire n’est pas dans le répertoire courant, il faut précéder son nom par le chemin relatif par rapport au répertoire courant.

Note

Si vous avez fait une faute de frappe ou de recopie et ne comprenez pas ce qui s’affiche dans la fenêtre, consultez le message d’erreur qui s’affiche afin de comprendre le problème. Les erreurs communes sont des fautes de frappe dans le nom des variables et des erreurs dans le syntaxe de python. En cas de blocage, vous pouvez faire appel à l’enseignant.

Dataset et DataFrame

Un Dataset est une collection distribuée de données. Il peut être vu comme une évolution conceptuelle des RDD (Resilient Distributed DataFrames), historiquement la première structure de données distribuée employée par Spark. Un DataFrame est un Dataset organisé en colonnes qui portent des noms, comme les tables d’une base de données. Avec l’interface de programmation en python, le type DataFrame est simplement l’alias du type Dataset[Row].

Il peut être utile de lire ces explications préparées par les créateurs de Spark pour mieux comprendre l’intérêt de chacune des interfaces de programmation (API) RDD et Dataset / DataFrame.

Le lancement de pyspark crée implicitement l’instance spark de SparkSession qui comporte un certain nombre d’informations de configuration (par ex., indique à Spark comment accéder à un cluster pour exécuter les commandes) et qui peut être employée directement. Dans d’autres cas, cette instance doit être créée explicitement avec

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

Créez maintenant un DataFrame à partir du fichier texte sophie.txt :

texteSophie = spark.read.text("tpintro/sophie.txt")

Dans la configuration actuelle de Spark dans la salle de travaux pratiques, c’est le système de fichiers (filesystem) local et non HDFS (distribué) qui est utilisé par défaut. Si vous entrez simplement .text("tpintro/sophie.txt"), Spark cherchera le fichier dans le système de fichiers par défaut, en suivant le chemin spécifié. Avec le préfixe file:// on indique que le fichier à lire est sur le système de fichiers local, avec le préfixe hdfs:// on indique qu’il s’agit plutôt de HDFS. L’utilisation d’un de ces préfixes exige l’emploi du chemin absolu vers le fichier correspondant.

Visualisez le fichier sophie.txt. Pour cela, il faut ouvrir une autre fenêtre terminal. Allez ensuite dans le répertoire où se trouve le fichier sophie.txt et examinez-le :

%%bash
cd tpintro
cat sophie.txt

La commande Linux/MacOS cat (sous Windows, type) permet d’afficher le contenu d’un fichier, en l’occurrence ici du texte.

Il est possible d’appliquer aux DataFrames des actions, qui retournent des valeurs, et des transformations, qui modifient les DataFrames ou en produisent de nouveaux.

Quelques actions simples :

texteSophie.count() # retourne le nombre de lignes dans ce DataFrame
texteSophie.first() # retourne les première ligne de ce DataFrame
texteSophie.take(5) # retourne les 5 premières lignes de ce DataFrame
texteSophie.collect() # retourne sur le *driver* le contenu complet de ce DataFrame et en affiche une partie à la console

Avec .collect(), lorsque le DataFrame en question est distribué, toutes ses données situées sur les nœuds de calcul sont d’abord regroupées sur le nœud driver. Cette opération très coûteuse si le DataFrame est grand, et qui peut engendrer des débordements de la mémoire sur le driver, donc à éviter autant que possible. Ensuite, une petite partie des données est affichée à la console.

Question :

À quoi correspond une ligne de ce DataFrame ?

L’exécution d’une action comme count sur un très grand DataFrame se déroule de la manière suivante : le DataFrame est composé de fragments, chacun stocké sur un nœud de calcul ; chaque fragment contient un (grand) nombre de lignes ; le programme driver de Spark envoie à chaque nœud le traitement à faire ; chaque nœud de calcul exécute le traitement sur le fragment local et transmet les résultats au driver. Pour d’autres actions (comme first) seul un fragment est nécessaire donc un seul nœud est sollicité.

Quelques transformations simples :

  • Construire un DataFrame lignesAvecPoupee comprenant seulement les lignes de texteSophie qui contiennent « poupée », retourner un tableau avec ses 2 premières lignes :

lignesAvecPoupee = texteSophie.filter(texteSophie.value.contains("poupée"))
lignesAvecPoupee.show(2) # affiche les 2 premières lignes de ce DataFrame
  • Construire un DataFrame longueursLignes composé des longueurs des lignes de texteSophie, retourner un tableau avec ses 5 premières lignes :

from pyspark.sql.functions import *  # pour utiliser fonctions comme length()
longueursLignes = texteSophie.select(length(texteSophie.value))
longueursLignes.show(5) # affiche les 5 premières lignes de ce DataFrame

Dans ces exemples, l’expression texteSophie.value retourne le contenu de l’unique colonne de texteSophie, appelée value.

L’exécution d’une transformation sur un très grand DataFrame se déroule de la manière suivante : le DataFrame est composé de fragments, chacun stocké sur un nœud de calcul ; le programme driver de Spark envoie à chaque nœud le traitement à faire ; chaque nœud de calcul exécute le traitement sur le fragment local et conserve les résultats localement.

Nous pouvons constater que l’unique colonne de longueursLignes a un nom complexe. Il peut être utile de renommer une colonne pour la manipuler plus facilement, per exemple :

longueursLignes.select(longueursLignes['length(value)'].alias("lengths")).show(5)

Enchaînements et évaluation « paresseuse »

Transformations et actions peuvent s’enchaîner :

Question :

Combien de lignes contiennent poupée ? Écrire la commande nécessaire dans pyspark et vérifier dans un terminal (bash) Linux.

Vous pouvez vérifier ce résultat de différentes façons, par exemple en utilisant un éditeur de texte (KWrite sur les machines de TP) pour compter les lignes. Il est toutefois possible d’obtenir cette valeur directement depuis la ligne de commande :

Dans un terminal :

%%bash
cd tpintro
grep poupée sophie.txt | wc -l

grep poupée sophie.txt renvoie les lignes du fichier d’entrée (ici sophie.txt envoyé par la commande précédente) qui contiennent le « motif » (pattern) poupée. Le caractère | indique que wc so la sortie de la commande qui précède ne doit pas être envoyée à la console mais transférée à la commande qui suit (c’est un enchaînement de commandes ou pipe Linux). Ces lignes sont donc envoyées vers la commande Linux wc -l qui affiche le nombre total de lignes du fichier reçu en entrée (ici, les lignes de sophie.txt contenant poupée).

Question :

Combien de caractères contient le fichier ? Utilisez un éditeur de texte ou bien la commande wc -m pour vérifier le résultat. Attention : la commande wc considère les sauts de ligne comme un caractère (mais pas Spark).

Important : l’évaluation est « paresseuse » : les opérations sont effectuées seulement quand un résultat doit être retourné. Par exemple, dans la séquence

texteSophie = spark.read.text("tpintro/sophie.txt")
longueursLignes = texteSophie.select(length(texteSophie.value))
longueursLignes.select(sum("length(value)")).show()

les données ne sont pas chargées en mémoire après la première ligne et longueursLignes n’est pas construit immédiatement après la transformation de la seconde ligne. Ce n’est qu’au moment où l’action show doit être exécutée que Spark partitionne les calculs à faire en tâches pour les différentes machines (et/ou cœurs) et chaque machine (et/ou cœur) exécute sa partie de calcul de longueurs de lignes (pour les lignes stockées localement) et de somme, avant de retourner la réponse au programme driver qui fait la somme finale (et qui contrôle l’exécution).

En conséquence, lorsque pyspark vous signale une erreur, il est possible que cette erreur ne provienne pas de la dernière instruction que vous avez écrite mais d’une instruction antérieure. Pour cela, lorsque vous mettez en place une chaîne de traitement en vous servant de pyspark, il est utile d’entrer de temps en temps des actions simples (comme par ex. longueursLignes.take(2)) pour identifier plus facilement les éventuelles erreurs.

Note

Si vous souhaitez supprimer les nombreuses lignes d’information qui précèdent la réponse (vous pouvez faire cela sur votre installation personnelle de Spark, pas sur celle de la salle de TP), dans le répertoire conf copiez log4j.properties.template en log4j.properties et, dans ce dernier fichier, ligne log4j.rootCategory=INFO, remplacez INFO par WARN. Dans une fenêtre terminal (bash), à ouvrir séparément de la fenêtre Spark, entrez les commandes suivantes (remplacer /opt/spark par le répertoire dans lequel spark est installé sur votre ordinateur personnel) :

cd /opt/spark/conf
sudo cp log4j.properties.template log4j.properties
sudo gedit log4j.properties

Dans gedit, ligne log4j.rootCategory=INFO, remplacez INFO par WARN. Pour rappel, ci-dessus $ est le prompt système dans la fenêtre bash.

Persistance

Sur le nœud driver comme sur chaque nœud de calcul, Spark s’exécute dans une machine virtuelle Java. Par conséquent, une structure de données de type RDD ou DataFrame peut être supprimée de la mémoire gérée par la machine virtuelle Java si plus de place est nécessaire pour une nouvelle structure de données. La structure de données supprimée sera créée de nouveau (potentiellement recalculée à partir de ce qui est stocké sur le stockage de masse, non volatile) si elle est de nouveau nécessaire dans des calculs. Or, cette opération peut être très coûteuse.

Il est possible d’éviter (dans la mesure du possible) la suppression des structures de données dont nous savons à l’avance qu’elles sont de nouveau nécessaires plus tard en les déclarant comme prioritaires. Pour cela, deux fonctions sont disponibles : .cache(), qui emploie le niveau de stockage courant (par défaut : en mémoire vive ou sur stockage de masse), et .persist(), qui permet de préciser le niveau de stockage (en mémoire vive seulement, ou en mémoire vive ou sur stockage de masse, etc.).

Exemple :

texteSophie = spark.read.text("tpintro/sophie.txt")
longueursLignes = texteSophie.select(length(texteSophie.value)).cache()

Utilisation de SQL

Il est possible de décrire le schéma d’un DataFrame :

texteSophie.printSchema()
longueursLignes.printSchema()

Dans ce cas, une seule colonne est présente et se nomme « value » (nom par défaut). Cette colonne contient des valeurs de type string (chaînes de caractères, c’est-à-dire du texte).

Pour rendre le DataFrame texteSophie plus complexe, nous pouvons lui ajouter une colonne avec les longueurs des lignes correspondantes :

texteSophieNouveau = texteSophie.withColumn("lengths", length("value"))
texteSophieNouveau.show(5)

Nous pouvons obtenir un nouveau DataFrame comme résultat d’une requête SQL exécutée sur un premier DataFrame déclaré comme table SQL (vue temporaire) :

# Création d'une vue SQL temporaire "lignes" correspondant au DataFrame texteSophie
texteSophie.createOrReplaceTempView("lignes")
# Application d'une requête SQL sur la vue temporaire
lignesPoupee = spark.sql("SELECT value FROM lignes WHERE value LIKE '%poupée%'")
lignesPoupee.show()

Question :

Que contient lignesPoupee ?

Un autre exemple, sur un DataFrame à deux colonnes :

# Création d'une vue SQL temporaire "lignesAvecLongueurs"
texteSophieNouveau.createOrReplaceTempView("lignesAvecLongueurs")
# Application d'une requête SQL sur la vue temporaire
lignesLongues = spark.sql("SELECT value FROM lignesAvecLongueurs WHERE lengths > 20")
lignesLongues.show()

Question :

Que contient lignesLongues ?