Travaux pratiques - Introduction à Spark et Scala¶
(une variante de ce TP utilisant le langage Python)
Références externes utiles :
L’objectif de cette première séance de TP est d’introduire l’interpréteur de commandes de Spark en langage Scala, quelques opérations de base sur les structures de données distribuées que sont les DataFrame, ainsi que quelques notions simples et indispensables concernant le langage Scala.
Pour les séances de travaux pratiques, Spark est installé sur un système d’exploitation Linux. Si vous n’êtes pas familier avec les commandes Linux, il est utile de consulter au préalable ce site.
Les exemples ci-dessous reprennent, en partie, ceux des documentations en ligne de Spark.
Dans les salles de travaux pratiques (TP), lors du démarrage de l’ordinateur, choisissez la configuration openSUSE
. Après l’ouverture de session, ouvrez une fenêtre de type terminal (par exemple, Konsole ou Qterminal). Dans cette fenêtre vous pouvez entrer des commandes Linux qui sont traitées par un interpréteur de commandes spécifiques (bash
dans le cas particulier des salles de TP, terme générique shell
).
Dans les salles, Spark est installé directement dans Linux (distribution openSUSE
) et, dans la suite, certaines commandes système sont spécifiques à cette configuration. Si vous installez Spark d’une autre façon (machine virtuelle ou machine personnelle), il sera peut-être nécessaire d’adapter les commandes système.
Si vous suivez ce cours en formation à distance ou si vous souhaitez travailler chez vous (par exemple pour réaliser le projet de cette unité d’enseignement), il vous sera nécessaire d’installer Spark sur un ordinateur auquel vous avez accès en permanence. Suivez ces instructions d”installation de Spark et signalez les éventuelles difficultés rencontrées (après avoir quand même cherché vous-même des solutions dans des forums sur le web).
Les séances de travaux pratiques avec Spark peuvent être réalisées en utilisant Apache Zeppelin, une interface web pour Spark. Pour ce faire vous pouvez télécharger le cahier (notebook) Zeppelin à partir du bandeau en haut du TP (clic droit > Enregistrer sous) puis importer le fichier .json obtenu dans Zeppelin.
Si vous êtes inscrit au Cnam, vous avez accès au serveur Zeppelin RCP216 du département informatique: https://zeppelin.kali-service.cnam.fr (identifiants Siscol).
Spark : concepts de base avec exemples¶
Il est possible d’utiliser Spark à partir des langages Java, Scala, Python ou (dans une moindre mesure) R
à travers une interface en ligne de commandes (en Scala ou Python), pratique aussi bien pour des tests interactifs que pour l’étape de mise au point d’une nouvelle application,
en écrivant (et en exécutant ensuite) un programme (voir la séance de TP suivante).
Certaines librairies ayant été développées seulement en Scala, une application écrite en Java ou en Python doit les appeler pour employer leurs fonctionnalités. Dans ces séances de TP nous nous servirons exclusivement de Scala.
Lancement de l’interpréteur de commandes en Scala et opérations simples¶
Pour ce premier TP, nous utiliserons l’interpréteur de commandes spark-shell
. Il s’agit d’une invite de commandes interactive permettant de communiquer directement avec un cluster Spark local.
Nous allons travailler avec un fichier texte contenant la première partie du livre Les Malheurs de Sophie de la comtesse de Ségur, téléchargeable ici. Commençons par ouvrir un terminal (Windows ou MacOS/Linux).
(attention : les commandes qui suivent %%bash
sont à rentrer dans une fenêtre terminal ou invite de commandes)
%%bash
mkdir tpintro # Créé un dossier nommé tpintro
cd tpintro # Se place dans le dossier tpintro
wget https://cedric.cnam.fr/vertigo/Cours/RCP216/sophie.txt
# wget permet de télécharger un fichier
%%bash
mkdir tpintro # Créé un dossier nommé tpintro
cd tpintro # Se place dans le dossier tpintro
curl https://cedric.cnam.fr/vertigo/Cours/RCP216/sophie.txt -o sophie.txt
# curl permet de télécharger un fichier
Téléchargez le fichier sophie.txt
puis déplacez-le dans un répertoire tpIntro
. En ligne de commande, déplacez-vous dans ce dossier (cd chemin\vers\tpIntro\
).
Lancez ensuite dans cette fenêtre l’interpréteur de commandes spark-shell
en entrant
spark-shell
Les commandes suivantes précédées par %%bash
sont à entrer dans une fenêtre terminal (invite de commandes), celles non précédées par %%bash
dans l’interpréteur spark-shell
. Sous Linux/MacOS, le répertoire courant d’une fenêtre terminal peut être connu avec la commande pwd
. Le répertoire courant pour les commandes qui sont entrées dans spark-shell
est celui dans lequel spark-shell
a été lancé. Pour le connaître vous pouvez entrer dans spark-shell
:
import sys.process._
"pwd" !
Si un fichier à lire n’est pas dans le répertoire courant, il faut précéder son nom par le chemin relatif par rapport au répertoire courant.
Créez un Dataset à partir du fichier texte sophie.txt
:
val texteSophie = spark.read.textFile("sophie.txt")
Note
Si vous avez fait une faute de frappe ou de recopie et ne comprenez pas ce qui s’affiche dans la fenêtre, consultez le message d’erreur qui s’affiche afin de comprendre le problème. Les erreurs communes sont des fautes de frappe dans le nom des variables et des erreurs dans le syntaxe de Scala. En cas de blocage, vous pouvez faire appel à l’enseignant.
spark
est un objet SparkSession
qui comporte un certain nombre d’informations de configuration (par ex., indique à Spark comment accéder à un cluster pour exécuter les commandes). Dans ce TP les commandes seront exécutées sur un seul nœud, qui est celui sur lequel vous entrez les commandes et sur lequel tourne le programme driver de Spark.
Visualisez le fichier sophie.txt
. Pour cela, il faut ouvrir une autre fenêtre terminal. Allez ensuite dans le répertoire où se trouve le fichier sophie.txt
et examinez-le :
%%bash
cd tpintro
cat sophie.txt
%%bash
cd tpintro
type sophie.txt
La commande cat
(sous Windows, type
) permet d’afficher le contenu d’un fichier, en l’occurrence ici du texte.
Dataset et DataFrame¶
Un Dataset est une collection distribuée de données. Il peut être vu comme une évolution conceptuelle des RDD (Resilient Distributed Datasets), historiquement la première structure de données distribuée employée par Spark. Un DataFrame est un Dataset organisé en colonnes qui portent des noms, comme les tables d’une base de données. Avec l’interface de programmation en Scala, le type DataFrame
est simplement l’alias du type Dataset[Row]
.
Il est utile de lire ces explications préparées par les créateurs de Spark pour mieux comprendre l’intérêt de chacune des interfaces de programmation (API) RDD et Dataset / Dataframe.
Il est possible d’appliquer aux Datasets des actions, qui produisent des valeurs, et des transformations, qui produisent de nouveaux Datasets, ainsi que certaines fonctions qui n’entrent dans aucune de ces deux catégories.
Quelques actions simples :
texteSophie.count() // nombre de lignes dans ce Dataset
texteSophie.first() // première ligne de ce Dataset
texteSophie.take(5) // 5 premières lignes de ce Dataset
texteSophie.collect() // retourne une partie du contenu de ce Dataset
Question :
À quoi correspond une ligne de ce Dataset ?
L’exécution d’une action comme count
sur un très grand Dataset se déroule de la manière suivante : le Dataset est composé de fragments, chacun stocké sur un nœud de calcul ; chaque fragment contient un (grand) nombre de lignes ; le programme driver de Spark envoie à chaque nœud le traitement à faire ; chaque nœud de calcul exécute le traitement sur le fragment local et transmet les résultats au driver. Pour d’autres actions (comme first
) seul un fragment est nécessaire donc un seul nœud est sollicité.
Quelques transformations simples :
Construire un Dataset comprenant seulement les lignes de
texteSophie
qui contiennent « poupée », retourner un tableau avec ses 2 premières lignes :
val lignesAvecPoupee = texteSophie.filter(line => line.contains("poupée"))
lignesAvecPoupee.take(2) // tableau avec les 2 premières lignes de ce Dataset
Construire un Dataset composé des longueurs des lignes de
texteSophie
, retourner un tableau avec ses 5 premières lignes :
val longueursLignes = texteSophie.map(l => l.length)
longueursLignes.take(5) // tableau avec les 5 premières lignes de ce Dataset
Dans ces exemples, les expressions line => line.contains("poupée")
et l => l.length
sont des définitions de « fonctions anonymes » (voir les éléments de Scala plus loin). Elles indiquent comment transformer chaque ligne du Dataset ; line
ou l
sont ainsi implicitement assimilés à une ligne générique du Dataset. Il est possible d’utiliser le nom de votre choix à la place de line
ou l
, comme ligne
, x
, toto
, etc.
L’exécution d’une transformation comme map
sur un très grand Dataset se déroule de la manière suivante : le Dataset est composé de fragments, chacun stocké sur un nœud de calcul ; le programme driver de Spark envoie à chaque nœud le traitement à faire ; chaque nœud de calcul exécute le traitement sur le fragment local et conserve les résultats localement.
Enchaînements de transformations et d’actions :
Question :
Combien de lignes contiennent poupée
? Écrire la commande nécessaire dans spark-shell
et vérifier dans un terminal (bash
) Linux.
Vous pouvez vérifier ce résultat de différentes façons, par exemple en utilisant un éditeur de texte (KWrite sur les machines de TP) pour compter les lignes. Il est toutefois possible d’obtenir cette valeur directement depuis la ligne de commande :
Dans un terminal :
%%bash
cd tpintro
grep poupée sophie.txt | wc -l
grep poupée sophie.txt
renvoie les lignes du fichier d’entrée (ici sophie.txt
envoyé par la commande précédente) qui contiennent le « motif » (pattern) poupée
. Le caractère |
indique que la sortie de la commande qui précède ne doit pas être envoyée à la console mais transférée à la commande qui suit (c’est un enchaînement de commandes ou pipe Linux). Ces lignes sont donc envoyées vers la commande Linux wc -l
qui affiche le nombre total de lignes du fichier reçu en entrée (ici, les lignes de sophie.txt
contenant poupée
).
%%bash
cd tpintro
findstr poupée sophie.txt | find /c /v ""
findstr string file
recherche la chaîne de caractères string
dans le fichier file
. Le caractère |
indique que la sortie de la commande qui précède ne doit pas être retourné à la console mais envoyé vers la commande qui suit (c’est un enchaînement de commandes ou pipe). find /c /v ""
permet ensuite de compter le nombre de lignes du fichier reçu en entrée (ici, les lignes de sophie.txt
contenant poupée
).
Question :
Combien de caractères contient le fichier ? Utilisez un éditeur de texte ou bien la commande wc -m
pour vérifier le résultat. Attention : la commande wc
considère les sauts de ligne comme un caractère (mais pas Spark).
Important : l’évaluation est « paresseuse » : les opérations sont effectuées seulement quand un résultat doit être retourné. Par exemple, dans la séquence
val texteSophie = spark.read.textFile("sophie.txt")
val longueursLignes = texteSophie.map(l => l.length)
longueursLignes.reduce((a, b) => a + b)
les données ne sont pas chargées en mémoire après la première ligne et longueursLignes
n’est pas construit immédiatement après la transformation map
de la seconde ligne. Ce n’est qu’au moment où l’action reduce
doit être exécutée que Spark partitionne les calculs à faire en tâches pour les différentes machines (et/ou cœurs) et chaque machine (et/ou cœur) exécute sa partie de map
et de reduce
, avant de retourner la réponse au programme driver (qui contrôle l’exécution).
En conséquence, lorsque spark-shell
vous signale une erreur, il est possible que cette erreur ne provienne pas de la dernière instruction que vous avez écrite mais d’une instruction antérieure. Pour cela, lorsque vous mettez en place une chaîne de traitement en vous servant de spark-shell
, il est utile d’entrer de temps en temps des actions simples (comme longueursLignes.take(2)
ici) pour identifier plus facilement les éventuelles erreurs.
Examinons plus attentivement l’action reduce((a, b) => a + b)
de l’exemple précédent. L’expression (a, b) => a + b
est une définition de « fonction anonyme » (voir les éléments de Scala plus loin) qui indique comment transformer une paire de lignes du Dataset ; a
et b
sont ainsi implicitement assimilés à deux lignes génériques du Dataset. Il est possible d’utiliser le nom de votre choix à la place de a
/et/ou b
. Le déroulement de cette action est le suivant : chaque nœud, sur le fragment local du Dataset, remplace chaque paire de lignes par leur somme, ensuite applique de nouveau cette opération jusqu’à ce qu’il obtienne une seule valeur (qui sera dans ce cas la somme des valeurs de toutes les lignes) ; ces résultats sont transmis au driver qui les additionne pour obtenir la somme globale.
Note
Si vous souhaitez supprimer les nombreuses lignes d’information qui précèdent la réponse (vous pouvez faire cela sur votre installation personnelle de Spark, pas sur celle de la salle de TP), dans le répertoire conf
copiez log4j.properties.template
en log4j.properties
et, dans ce dernier fichier, ligne log4j.rootCategory=INFO
, remplacez INFO
par WARN
. Dans une fenêtre terminal (bash
), à ouvrir séparément de la fenêtre Spark, entrez les commandes :
cd /opt/spark/conf
sudo cp log4j.properties.template log4j.properties
sudo gedit log4j.properties
Dans gedit
, ligne log4j.rootCategory=INFO
, remplacez INFO
par WARN
. Pour rappel, ci-dessus $
est le prompt système dans la fenêtre bash
.
Actions¶
Dans les exemples précédents, count
, first
, take
, collect
et reduce
sont des exemples d’actions.
Voici ci-dessous quelques actions parmi les plus utilisées. Consulter la documentation (Scala) pour une liste complète et des spécifications détaillées.
reduce(func)
| Agréger les éléments du Dataset en utilisant la fonctionfunc
(qui prend 2 arguments et retourne 1 résultat). La fonction devrait être associative et commutative pour être correctement calculée en parallèle.collect()
| Retourner toutes les lignes du Dataset comme un tableau au programme driver. À utiliser seulement si le Dataset a un volume faible (par ex., après des opérations de typefilter
très sélectives).count()
| Retourner le nombre de lignes du Dataset.head(n)
| Retourner un tableau avec lesn
premières lignes du Dataset.take(n)
| Alias pourhead(n)
.first()
| Retourner la première ligne du Dataset (équivalent àtake(1)
ouhead()
).show(n)
| Afficher lesn
premières lignes du Dataset sous forme de tableau.foreach(func)
| Appliquer la fonctionfunc
à toutes les lignes.
Transformations et persistance¶
Les transformations peuvent
produire un Dataset à partir d’un autre Dataset :
map
,filter
,sort
, etc.produire un Dataset à partir de deux Dataset :
union
,join
,crossJoin
, etc.produire 0 ou plusieurs Dataset à partir d’un Dataset :
flatMap
.
Un exemple avec flatMap
: à partir de texteSophie
, obtenir un Dataset motstexteSophie
ayant comme lignes les mots des différentes lignes de texteSophie
et le rendre persistant pour faciliter des traitements ultérieurs.
texteSophie.flatMap(line => line.split(" ")).show()
La méthode .split()
permet de transformer une chaîne de caractères en une liste de mots séparés par un délimiteur. Ici, les lignes (du Dataset texteSophie
) sont divisées en mots séparés par des espaces. Il est possible d’utiliser plusieurs délimiteurs en les indiquant entre crochets ([
et ]
), par exemple pour séparer des mots accolés à une ponctuation.
val delimiteurs = "[ .;,?!--()_]"
val motstexteSophie = texteSophie.flatMap(line => line.split(delimiteurs))
motstexteSophie.persist()
motstexteSophie
n’est pas calculé immédiatement (évaluation paresseuse) mais le sera au moment où un résultat devra être retourné. Avec .persist()
(ou .cache()
), Spark cherchera à conserver ce Dataset en mémoire une fois qu’il sera calculé.
motstexteSophie.show()
Question :
Comptez le nombre d’occurrences de chaque mot et affichez les 30 premières lignes résultantes. Indication pour la solution : .groupByKey(identity).count()
permet de compter le nombre d’occurrences de chaque mot (les mots sont groupés par leur « identité », ensuite les occurrences comptées).
Question :
Retournez la valeur maximale et la moyenne du nombre de mots des lignes du fichier.
Bien entendu, il est possible d’enchaîner directement les opérations :
texteSophie.flatMap(line => line.split(delimiteurs)).groupByKey(identity).count().show(10)
Rappelons que dans les fonctions anonymes comme line => line.split(" ")
il est possible d’utiliser une autre notation pour les lignes des Datasets correspondants, par exemple a => a.split(" ")
; le mot « line » n’a pas de signification particulière pour le programme (il rappelle simplement au programmeur la signification des lignes de ces Datasets).
Il est possible de décrire le schéma d’un Dataset (DataFrame) :
motstexteSophie.printSchema()
Dans ce cas, une seule colonne est présente et se nomme « value » (nom par défaut). Cette colonne contient des valeurs de type string (chaînes de caractères, c’est-à-dire du texte).
Il est possible d’obtenir un nouveau DataFrame comme résultat d’une requête SQL :
// Création d'une vue SQL temporaire "mots"
motstexteSophie.createOrReplaceTempView("mots")
// Application d'une requête SQL sur la vue temporaire
val apresT = spark.sql("SELECT value FROM mots WHERE value > 't'")
apresT.show()
Question :
Que contient apresT
?
Voici maintenant quelques transformations parmi les plus utilisées. Consulter la documentation (Scala) pour une liste complète et des spécifications détaillées.
map(func)
| Retourne un nouveau Dataset obtenu en appliquant la fonctionfunc
à chaque ligne du Dataset de départ.filter(func)
| Retourne un nouveau Dataset obtenu en sélectionnant les lignes de la source pour lesquelles la fonctionfunc
retourne « vrai ».flatMap(func)
| Similaire àmap
mais chaque ligne du Dataset source peut être transformée en 0 ou plusieurs lignes ; retourne une séquence (Seq
) plutôt qu’une seule ligne. |sample(withReplacement,fraction,seed)
| Retourne un Dataset contenant une fraction aléatoirefraction
du Dataset auquel la transformation s’applique, avec ou sans remplacement, avec uneseed
pour le générateur aléatoire.union(otherDataset)
| Retourne un Dataset qui est l’union des lignes du Dataset source et du Dataset argument (otherDataset
).intersection(otherDataset)
| Retourne un Dataset qui est l’intersection des lignes du Dataset source et du Dataset argument (otherDataset
).distinct()
| Retourne un Dataset qui est obtenu du Dataset source en éliminant les doublons des lignes.groupByKey(func)
| Pour un Dataset, calcule la clé parfunc
et retourne un KeyValueGroupedDataset.join(otherDataset, joinExprs,
| Pour un Dataset jointure avecotherDataset
de typejoinType
avec condition de jointurejoinExprs
.crossJoin(otherDataset)
| Pour Dataset de type T etotherDataset
de type U, retourne un Dataset de type (T, U) (produit cartésien).
Création de Dataset, stockage de Dataset¶
- Il y a plusieurs possibilités pour créer un Dataset :
À partir d’un RDD (Resilient Distributed Dataset) existant.
À partir de données externes.
Transformer un (ou plusieurs) Dataset(s) existant(s).
Plusieurs solutions permettent d’obtenir un Dataset / DataFrame à partir d’un RDD. Le plus simple est de spécifier le schéma des données du DataFrame et ensuite d’appeler la méthode .createDataFrame(rowRDD, schema)
de SparkSession
. Afin d’obtenir un RDD à partir d’un Dataset / DataFrame il suffit d’appeler la méthode .rdd
de Dataset
/ DataFrame
.
Spark permet de créer un Dataset à partir de toute source de données acceptée par Hadoop (fichier local, HDFS, HBase, etc.). Quelques exemples :
SparkSession.read.textFile()
: lit le fichier donné en paramètre (sous forme d’URI) et construit un Dataset dont les lignes sont les lignes du fichier texte ;SparkSession.readStream...
: construction de Datasets à partir d’un flux de données.
Note
Si l’URI correspond à un fichier local, il faut s’assurer que le fichier est accessible avec le même URI aux workers.
Toutes les méthodes Spark de création de Dataset à partir de fichiers peuvent fonctionner sur des répertoires, des fichiers compressés et utiliser des wildcards. Par exemple : textFile("/my/directory")
, textFile("/my/directory/*.txt")
et textFile("/my/directory/*.gz")
.
Dans la configuration actuelle de Spark dans la salle de travaux pratiques, c’est le système de fichiers (filesystem) local et non HDFS (distribué) qui est utilisé par défaut. Si vous entrez simplement .textFile("sophie.txt")
, Spark cherchera le fichier dans le système de fichiers par défaut. Avec le préfixe file://
on indique que le fichier à lire est sur le système de fichiers local, avec le préfixe hdfs://
on indique qu’il s’agit plutôt de HDFS. L’utilisation d’un de ces préfixes exige l’emploi du chemin absolu vers le fichier correspondant.
Un Dataset peut être sauvegardé en utilisant Dataset.write...
, par exemple Dataset.write.text(path)
sous format texte. path
indique un répertoire du système de fichiers local (préfixe file://
), HDFS (préfixe hdfs://
) ou autre fichier supporté par Hadoop.
Scala : concepts de base avec exemples¶
Scala est à la fois un langage objet pur, dans lequel chaque valeur est un objet (contrairement à Java), et un langage fonctionnel, chaque fonction étant également une valeur (donc un objet). Pour référence et un éventuel apprentissage plus approfondi du langage :
Fonctions anonymes¶
Les « fonctions anonymes » sont très utiles dans la définition d’opérations simples à appliquer aux données. Les expressions vues plus haut, comme (a, b) => a + b
, (a, b) => if (a > b) a else b
ou l => l.split(" ").size
sont des définitions de fonctions anonymes. Les deux premières fonctions ont deux arguments chacune, la troisième fonction a un seul argument. La définition (a, b) => a + b
est, pour des arguments qui sont des Int
, une version courte de la définition suivante :
new Function1[Int, Int, Int] {
def apply(x: Int, y: Int): Int = x + y
}
(c’est une définition formelle, non directement interprétable, ne l’entrez pas au clavier)
Il est possible de définir des fonctions anonymes avec plus de deux paramètres, par exemple (x: Int, y: Int, z: Int) => x + y - z
. Une telle fonction peut être appelée directement :
((x: Int, y: Int, z: Int) => x + y - z)(1,2,3) // exemple appel fonction anonyme
Enfin, une fonction anonyme peut n’avoir aucun paramètre, par exemple () => { System.getProperty("user.dir") }
, qui peut être appelée directement :
(() => { System.getProperty("user.dir") })() // exemple appel fonction anonyme
Note
Dans les expressions vues plus tôt, comme (a, b) => a + b
employée par exemple dans longueursLignes.reduce((a, b) => a + b)
, le type des arguments n’était pas explicitement indiqué car il était donné par le type des lignes qui composent le Dataset longueursLignes
.
Question :
Séparez en mots la phrase « Stat Roma pristina nomine, nomina nuda tenemus » à l’aide d’une fonction anonyme.
Listes en compréhension (Sequence Comprehensions)¶
Les « listes en compréhension » sont des listes dont le contenu est défini par filtrage du contenu d’une autre liste, contrairement à l’approche plus classique de la construction de liste par énumération de ses éléments.
Scala propose une notation facile pour définir des listes par compréhension : for (enumerators) yield e
, où enumerators
est une liste de enumerators séparés par des « ; ». Un enumerator est soit un générateur qui introduit une ou plusieurs nouvelles variables, soit un filtre. Cette construction évalue l’expression e
pour chaque association (binding) générée par les enumerators et retourne une séquence de ces valeurs.
Voici un exemple :
def odd(debut: Int, fin: Int) = for (i <- debut until fin if i % 2 != 0) yield i
println(odd(0, 20))
L’expression for
dans la définition de la fonction odd
introduit une nouvelle variable i: Int
qui prend ensuite toutes les valeurs générées par l’itérateur _ until _
qui passent le test if i % 2 != 0
(ce test élimine les valeurs paires). Ici l’expression e
de la syntaxe générale est réduite à la variable i
. L’expression for
retourne donc un tableau avec les nombres impairs entre debut
et fin - 1
.
Question
À l’aide d’une liste en compréhension, produire une liste des carrés des nombres entiers entre 5 et 15, sauf 12.