Travaux pratiques - Introduction à Spark et Scala

(la nouvelle version de cette séance, utilisant l’API DataFrame)

Références externes utiles :

Pour les séances de travaux pratiques, Spark est installé sur un système d’exploitation linux. Si vous n’êtes pas familier avec les commandes linux, il est utile de consulter au préalable ce site.

Les exemples ci-dessous reprennent, en partie, ceux des documentations en ligne de Spark.

Spark : concepts de base avec exemples

Dans la salle de travaux pratiques, lors du démarrage de l’ordinateur, choisissez la configuration Info-cloudera.... CDH5 (Cloudera) est déjà installé et inclut Spark. Dans la suite, certaines commandes système sont spécifiques à cette configuration. Si vous installez Spark d’une autre façon, il vous faudra adapter les commandes système.

Pour réaliser le projet, il sera nécessaire d’installer Spark sur un ordinateur auquel vous avez accès en permanence. Suivez ces instructions d”installation de Spark et signalez les éventuelles difficultés rencontrées (après avoir quand même cherché vous mêmes des solutions sur le web).

Il est possible d’utiliser Spark à partir des langages Java, Scala ou Python
  • à travers une interface en ligne de commandes (en Scala ou Python), pratique aussi bien pour des tests interactifs que pour l’étape de mise au point d’une nouvelle application,

  • en écrivant (et en exécutant ensuite) un programme (voir la séance de TP suivante).

Certaines librairies ayant été développées seulement en Scala, une application écrite en Java ou en Python doit les appeler pour employer leurs fonctionnalités.

Lancement de l’interpréteur de commandes en Scala et opérations simples

Vérifiez la présence du fichier LICENSE (LICENSE avec « S » car en anglais) et copiez-le dans un répertoire tpintro à créer (attention, [cloudera@quickstart ~]$ est l’invite de commandes linux, il ne faut pas l’entrer au clavier) :

[cloudera@quickstart ~]$ ls /usr/lib/spark/LICENSE
[cloudera@quickstart ~]$ mkdir tpintro
[cloudera@quickstart ~]$ cd tpintro
[cloudera@quickstart tpintro]$ cp /usr/lib/spark/LICENSE .
[cloudera@quickstart tpintro]$ export PATH="$PATH:/usr/lib/spark/bin"

Lancez ensuite dans cette fenêtre l’interpréteur de commandes spark-shell en entrant

[cloudera@quickstart tpintro]$ spark-shell

Créez un Resilient Distributed Dataset (RDD) à partir du fichier texte LICENSE en entrant dans la même fenêtre (attention, ici scala> est l’invite de commandes de l’interpréteur Spark, n’entrez pas cela sur la ligne de commande de l’interpréteur) :

scala> val texteLicence = sc.textFile("file:///home/cloudera/tpintro/LICENSE")

Si vous avez fait une faute de frappe ou de recopie et ne comprenez pas ce qui s’affiche dans la fenêtre, appelez l’enseignant ou fermez la session Spark en appuyant simultanément sur les touches Ctrl et d du clavier. Il faudra ensuite relancer spark-shell.

sc est un objet SparkContext qui indique à Spark comment accéder à un cluster pour exécuter les commandes. Dans ce TP les commandes seront exécutées sur un seul nœud, qui est celui sur lequel vous entrez les commandes (et sur lequel tourne le programme driver de Spark).

Visualisez le fichier LICENSE. Pour cela, il faut ouvrir une seconde fenêtre terminal. Allez ensuite dans le répertoire où se trouve le fichier LICENSE et examinez-le :

[cloudera@quickstart ~]$ cd tpintro
[cloudera@quickstart tpintro]$ more LICENSE

Avec la commande linux more il faut appuyer sur la touche espace pour passer à la page suivante. Sous linux, pour voir des explications concernant une commande comme more il faut entrer man nom_commande (par exemple man more) dans une fenêtre terminal (shell).

Il est possible d’appliquer aux RDD des actions, qui produisent des valeurs, et des transformations, qui produisent de nouveaux RDD.

Quelques actions simples :

scala> texteLicence.count() // nombre d'items dans ce RDD

scala> texteLicence.first() // premier item de ce RDD

scala> texteLicence.take(5) // 5 premiers items de ce RDD

scala> texteLicence.collect() // retourne le contenu de ce RDD

Question : à quoi correspond un « item » de ce RDD ? Regardez la documentation (voir les liens du début du TP).

L’exécution d’une action comme count sur un très grand RDD se déroule de la manière suivante : le RDD est composé de fragments, chacun stocké sur un nœud de calcul ; chaque fragment contient un (grand) nombre d’items ; le programme driver de Spark envoie à chaque nœud le traitement à faire ; chaque nœud de calcul exécute le traitement sur le fragment local et transmet les résultats au driver. Pour d’autres actions (comme first) seul un fragment est nécessaire donc seul un nœud est sollicité.

Quelques transformations simples :

  • Construire un RDD comprenant seulement les lignes de texteLicence qui contiennent « Copyright », retourner un tableau avec ses 2 premiers items :

scala> val lignesAvecCopyright = texteLicence.filter(line => line.contains("Copyright"))
scala> lignesAvecCopyright.take(2) // tableau avec les 2 premiers items de ce RDD
  • Construire un RDD composé des longueurs des lignes de texteLicence, retourner un tableau avec ses 5 premiers items :

scala> val longueursLignes = texteLicence.map(l => l.length)
scala> longueursLignes.take(5) // tableau avec les 5 premiers items de ce RDD

Dans ces exemples, les expressions l => l.contains("Copyright") et l => l.length sont des définitions de « fonctions anonymes » (voir les éléments de Scala plus loin). Elles indiquent comment transformer chaque item du RDD ; l est ainsi implicitement assimilé à un item générique du RDD. Il est possible d’utiliser le nom de votre choix à la place de l, comme ligne, line, x, toto, etc.

L’exécution d’une transformation comme map sur un très grand RDD se déroule de la manière suivante : le RDD est composé de fragments, chacun stocké sur un nœud de calcul ; le programme driver de Spark envoie à chaque nœud le traitement à faire ; chaque nœud de calcul exécute le traitement sur le fragment local et conserve les résultats localement.

Enchaînements de transformations et d’actions :

Question :

Combien de lignes contiennent Copyright ? Ecrire la commande nécessaire dans spark-shell et vérifier dans un terminal (shell) linux.

Pour vérifier, dans le second terminal pour commandes linux (et non celui avec spark-shell), entrer :

[cloudera@quickstart ~]$ cd tpintro
[cloudera@quickstart ~]$ cat LICENSE | grep Copyright | wc

cat LICENSE retourne le contenu du fichier LICENSE. Le caractère | indique que la sortie de la commande qui précède ne doit pas être retourné à la console mais envoyé vers la commande qui suit (c’est un enchaînement de commandes ou pipe linux). grep Copyright retourne les lignes du fichier d’entrée (ici LICENSE envoyé par la commande précédente) qui contiennent le « motif » (pattern) Copyright. Ces lignes sont envoyées (encore |) vers la commande linux wc qui retourne le nombre total de lignes, de « mots » (séquences de caractères séparées par des espaces et assimilés) et de caractères du fichier reçu en entrée (ici, les lignes de LICENSE contenant Copyright).

Question :

Combien de caractères contient le fichier ?

Important : l’évaluation est « paresseuse » : les opérations sont effectuées seulement quand un résultat doit être retourné. Par exemple, dans la séquence

scala> val texteLicence = sc.textFile("file:///home/cloudera/tpintro/LICENSE")
scala> val longueursLignes = texteLicence.map(l => l.length)
scala> longueursLignes.reduce((a, b) => a + b)

les données ne sont pas chargées en mémoire après la première ligne, longueursLignes n’est pas construit immédiatement après la transformation map de la seconde ligne, ce n’est qu’au moment où l’action reduce doit être exécutée que Spark partitionne les calculs à faire en tâches pour les différentes machines (et/ou cœurs) et chaque machine (et/ou cœur) exécute sa partie de map et de reduce, avant de retourner la réponse au programme driver (qui contrôle l’exécution). Lorsque spark-shell vous signale une erreur, il est possible que cette erreur ne provienne pas de la dernière instruction que vous avez écrite mais d’une instruction antérieure. Pour cela, lorsque vous mettez en place une chaîne de traitement en vous servant de spark-shell, il est utile d’entrer de temps en temps des actions simples comme longueursLignes.take(2) pour identifier plus facilement les éventuelles erreurs.

Examinons plus attentivement l’action reduce((a, b) => a + b) de l’exemple précédent. L’expression (a, b) => a + b est une définition de « fonction anonyme » (voir les éléments de Scala plus loin) qui indique comment transformer une paire d’items du RDD ; a et b sont ainsi implicitement assimilés à deux items génériques du RDD. Il est possible d’utiliser le nom de votre choix à la place de a ou b. Le déroulement de cette action est le suivant : chaque nœud, sur le fragment local du RDD, remplace chaque paire d’items par leur somme, ensuite applique de nouveau cette opération jusqu’à ce qu’il obtienne une seule valeur (qui sera dans ce cas la somme des valeurs de tous les items) ; ces résultats sont transmis au driver qui les additionne pour obtenir la somme globale.

Si vous souhaitez supprimer les nombreuses lignes d’information qui précèdent la réponse, dans le répertoire conf copiez log4j.properties.template en log4j.properties et, dans ce dernier fichier, ligne log4j.rootCategory=INFO, remplacez INFO par WARN. Sur les ordinateurs de la salle de TP, par défaut vous n’avez pas le droit d’écrire dans ce répertoire, il vous faudra précéder les commandes de copie et d’appel de l’éditeur de sudo. Dans une fenêtre terminal (shell), à ouvrir séparément de la fenêtre Spark, entrez les commandes :

[cloudera@quickstart ~]$ cd /usr/lib/spark/conf
[cloudera@quickstart conf]$ sudo cp log4j.properties.template log4j.properties
[cloudera@quickstart conf]$ sudo gedit log4j.properties

Dans gedit, ligne log4j.rootCategory=INFO, remplacez INFO par WARN. [cloudera@quickstart conf]$ est le prompt système dans la fenêtre shell.

Actions

Dans les exemples précédents, count, first, take, collect et reduce sont des exemples d’actions.

Voici ci-dessous quelques actions parmi les plus utilisées. Consulter la documentation (Scala, Java, Python) pour une liste complète et des spécifications détaillées.

Action

Signification

reduce(func)

Agréger les éléments du RDD en utilisant la fonction func (qui prend 2 arguments et retourne 1 résultat). La fonction devrait être associative et commutative afin de pouvoir être correctement calculée en parallèle.

collect()

Retourner tous les items du RDD comme un tableau au programme driver. A utiliser seulement si le RDD a un volume faible (par ex., après des opérations de type filter très sélectives).

count()

Retourner le nombre de items du le RDD.

take(n)

Retourner un tableau avec les n premiers items du RDD.

first()

Retourner le premier item du RDD (similaire à take(1)).

countByKey()

Pour RDD de type (clé, valeur), retourne l’ensemble de paires (clé, Int) avec le nombre de valeurs pour chaque clé.

takeSample(withReplacement,

num, [seed])

Retourne un tableau avec un échantillon aléatoire de num items du RDD, avec ou sans remplacement, avec une possible spécification de seed pour le générateur aléatoire.

saveAsTextFile(path)

Ecrit les items du RDD dans un fichier texte dans un répertoire du système de fichiers local, HDFS ou autre fichier supporté par Hadoop. Spark appelle toString pour convertir chaque item en une ligne de texte dans le fichier.

saveAsSequenceFile(path)

(Java et Scala seulement !) Ecrit les items du RDD sous forme de Hadoop SequenceFile dans un répertoire du système de fichiers local, HDFS ou autre fichier supporté par Hadoop. Disponible pour RDD de paires (clé,valeur) qui implémentent l’interface Writable de Hadoop. En Scala, disponible aussi pour des types implicitement convertibles à Writable (Int, Double, String,etc.).

saveAsObjectFile(path)

(Java et Scala seulement !) Ecrit les éléments du RDD en un format simple utilisant la sérialisation Java, qui peut être lu ensuite avec SparkContext.objectFile().

Transformations et persistance

Les transformations peuvent
  • produire un RDD à partir d’un autre RDD : map, filter, reduceByKey, etc.

  • produire un RDD à partir de deux RDD : union, join, cartesian, etc.

  • produire 0 ou plusieurs RDD à partir d’un RDD : flatMap.

Un exemple avec flatMap : à partir de texteLicence, obtenir un RDD motsTexteLicence ayant pour items les mots des différentes lignes et le rendre persistant pour faciliter des traitements ultérieurs.

scala> val motsTexteLicence = texteLicence.flatMap(line => line.split(" "))
scala> motsTexteLicence.persist()

Les lignes (items du RDD texteLicence) sont divisées en mots séparés par des espaces. motsTexteLicence n’est pas calculé immédiatement (évaluation paresseuse) mais le sera au moment où un résultat devra être retourné. Avec persist, Spark cherchera à conserver ce RDD en mémoire une fois qu’il sera calculé.

Question :

Comptez le nombre d’occurrences de chaque mot (regardez reduceByKey dans le tableau des transformations), affichez les 5 premiers items résultants.

Question :

Triez les items en ordre alphabétique des mots (regardez sortByKey dans le tableau des transformations), retournez les 5 premiers.

Bien entendu, il est possible d’enchaîner directement les opérations (écrire la totalité sur une seule ligne ou démarrer la ligne suivante par | suivie d’un espace avant d’écrire la suite sur la seconde ligne) :

scala> texteLicence.flatMap(line => line.split(" ")).map(word => (word, 1))
                   .reduceByKey((a, b) => a + b).sortByKey(true).take(5)

Noter que dans les fonctions anonymes line => line.split(" ") et word => (word, 1) il est possible d’utiliser une autre notation pour les items des RDD correspondants, par exemple a => a.split(" ") et a => (a, 1) ; les mots « line » et « word » n’ont pas de signification particulière pour le programme (ils rappellent simplement au programmeur la signification des items de ces RDD).

Voici quelques transformation parmi les plus utilisées. Consulter la documentation (Scala, Java, Python) pour une liste complète et des spécifications détaillées.

Transformation

Signification

map(func)

Returne un nouveau RDD obtenu en appliquant la fonction func à chaque item du RDD de départ.

filter(func)

Returne un nouveau RDD obtenu en sélectionnant les items de la source pour lesquels la fonction func retourne « vrai ».

flatMap(func)

Similaire à map mais chaque item du RDD source peut être transformé en 0 ou plusieurs items ; retourne une séquence (Seq) plutôt qu’un seul item.

sample(withReplacement,

fraction, seed)

Retourne un RDD contenant une fraction aléatoire fraction du RDD auquel la transformation s’applique, avec ou sans remplacement, avec une spécification de seed pour le générateur aléatoire.

union(otherDataset)

Retourne un RDD qui est l’union des items du RDD source et du RDD argument (otherDataset).

intersection(otherDataset)

Retourne un RDD qui est l’intersection des items du RDD source et du RDD argument (otherDataset).

distinct()

Retourne un RDD qui est obtenu du RDD source en éliminant les doublons des items.

groupByKey([numTasks])

Pour RDD de type (clé, valeur), retourne un RDD de paires (clé, Iterable<valeur>). Si le regroupement est réalisé en vue d’opérations d’agrégation (somme, moyenne), de meilleures performances peuvent être obtenues avec reduceByKey ou combineByKey.

reduceByKey(func, [numTasks])

Pour RDD de type (clé, valeur), retourne un RDD de paires (clé, valeur) où les valeurs pour chaque clé sont agrégées grâce à la fonction func de type (valeur,valeur) => valeur.

aggregateByKey(zeroValue)

(seqOp,combOp,[numTasks])

Pour RDD de type (clé, valeur), retourne un RDD de paires (clé, U) où les valeurs pour chaque clé sont agrégées grâce aux fonctions de combinaison seqOp, combOp et à une valeur neutre zeroValue. Le type de U peut être différent du type des valeurs de départ.

sortByKey([ascending], [numTasks])

Pour RDD de type (clé, valeur) où la clé implémente Ordered, retourne un RDD de paires (clé, valeur) trié par ordre ascendant ou descendant des clés (suivant la valeur de vérité de [ascending].

join(otherDataset, [numTasks])

Pour RDD de type (K, V) (ou (clé, valeur)) et otherDataset de type (K,W), retourne un RDD de paires (K, (V, W)) avec toutes les paires d’items pour chaque clé. Jointures externes avec leftOuterJoin, rightOuterJoin, and fullOuterJoin.

cogroup(otherDataset, [numTasks])

Pour RDD de type (K, V) (ou (clé, valeur)) et otherDataset de type type (K,W), retourne un RDD de paires (K, (Iterable<V>, Iterable<W>)). Synonyme de groupWith.

cartesian(otherDataset)

Pour RDD de type T et otherDataset de type U, retourne un RDD de type (T, U) (toutes les paires d’items).

pipe(command, [envVars])

Pipe de chaque partition à travers une commande shell (par ex. script Perl ou bash). Les items de RDD sont écrits sur stdin du process et les sorties du process (stdout) sont retournées comme un RDD de strings.

Question :

Retournez la valeur maximale et la moyenne du nombre de mots des lignes du fichier.

Création de RDD, stockage de RDD

Il y a deux possibilités pour créer un RDD :
  1. Référencer des données externes.

  2. Paralléliser une collection existante.

Spark permet de créer un RDD à partir de toute source de données acceptée par Hadoop (fichier local, HDFS, HBase, etc.). Quelques exemples :

  • SparkContext.textFile : lit le fichier donné en paramètre (sous forme de URI) et construit un RDD dont les items sont les lignes du fichier texte ;

  • SparkContext.wholeTextFiles : lit un répertoire contenant de nombreux fichiers texte de petite taille et retourne un RDD dans lequel chacun de ces fichiers constitue un item de type paire (nomFichier, contenuFichier).

  • SparkContext.sequenceFile[K, V] : lit un fichier-séquence avec K et V les types des clés et respectivement des valeurs dans la séquence. K et V devraient être des sous-classes de l’interface Hadoop Writable, comme IntWritable et Text.

  • SparkContext.hadoopRDD : pour autres InputFormats de Hadoop. Utiliser plutôt SparkContext.newHadoopRDD pour des formats d’entrée basés sur la « nouvelle » API MapReduce (org.apache.hadoop.mapreduce).

Note

Si l’URI correspond à un fichier local, il faut s’assurer que le fichier est accessible avec le même URI aux workers. Toutes les méthodes spark de création de RDD à partir de fichiers peuvent fonctionner sur des répertoires, des fichiers compressés et utiliser des wildcards. Par exemple : textFile("/my/directory"), textFile("/my/directory/*.txt") et textFile("/my/directory/*.gz").

Dans la configuration actuelle de Spark dans la salle de travaux pratiques, c’est le système de fichiers (filesystem) HDFS (distribué) qui est utilisé par défaut. Si vous entrez simplement sc.textFile("/home/cloudera/tpintro/LICENSE"), Spark cherchera le fichier dans le système de fichiers HDFS et ne le trouvera pas dans notre exemple. Avec le préfixe file:// on indique que le fichier à lire est sur le système de fichiers local.

En revanche, lorsque la configuration par défaut utilise le système de fichiers local, pour indiquer un fichier HDFS il faut employer le préfixe hdfs://.

Un RDD peut également être créé en parallélisant une collection existante, par un appel à la méthode SparkContext.parallelize avec une collection (Scala Seq) présente dans le programme driver. Les éléments de la collection sont distribués pour former un RDD sur lequel des opérations pourront être effectuées en parallèle. Par exemple, voici comment créer un RDD à partir d’un (très petit) tableau contenant les nombres de 1 à 5 :

scala> val donnees = Array(1, 2, 3, 4, 5)
scala> val donneesDistribuees = sc.parallelize(donnees)

Spark essaye de partitionner le RDD automatiquement à partir des caractéristiques du cluster sur lequel les calculs doivent être réalisés. Il est toutefois possible d’indiquer un nombre précis de partitions dans le second paramètre de la méthode parallelize, par exemple sc.parallelize(donnees, 30). Une partition correspond à une tâche et il est conseillé d’avoir entre 2 et 4 tâches par processeur (ou cœur).

Un RDD peut être sauvegardé (voir aussi le tableau des Actions, plus haut) :
  • sous forme de fichier texte avec saveAsTextFile(path) (Spark appelle toString pour convertir chaque item du RDD en une ligne de texte),

  • sous forme de SequenceFile Hadoop avec saveAsSequenceFile(path),

  • dans un format simple en utilisant la sérialisation Java avec saveAsObjectFile(path).

path indique un répertoire du système de fichiers local, HDFS ou autre fichier supporté par Hadoop.

Scala : concepts de base avec exemples

Scala est à la fois un langage objet pur, dans lequel chaque valeur est un objet (contrairement à Java), et un langage fonctionnel, chaque fonction étant également une valeur (donc un objet). Pour référence et un éventuel apprentissage plus approfondi du langage :

Fonctions anonymes

Les « fonctions anonymes » sont très utiles dans la définition d’opérations simples à appliquer aux données. Les expressions vues plus haut, comme (a, b) => a + b, (a, b) => if (a > b) a else b ou l => l.split(" ").size sont des définitions de fonctions anonymes. Les deux premières fonctions ont deux arguments chacune, la troisième fonction a un seul argument. La définition (a, b) => a + b est, pour des arguments qui sont des Int, une version courte de la définition suivante :

new Function1[Int, Int, Int] {
    def apply(x: Int, y: Int): Int = x + y
}

(c’est une définition formelle, non directement interprétable, ne l’entrez pas au clavier)

Il est possible de définir des fonctions anonymes avec plus de deux paramètres, par exemple (x: Int, y: Int, z: Int) => x + y - z. Une telle fonction peut être appelée directement :

scala> ((x: Int, y: Int, z: Int) => x + y - z)(1,2,3) // exemple appel fonction anonyme

Enfin, une fonction anonyme peut n’avoir aucun paramètre, par exemple () => { System.getProperty("user.dir") }, qui peut être appelée directement :

scala> (() => { System.getProperty("user.dir") })()  // exemple appel fonction anonyme

Note

Dans les expressions vues plus tôt, comme (a, b) => a + b employée par exemple dans longueursLignes.reduce((a, b) => a + b), le type des arguments n’était pas explicitement indiqué car il était donné par le type des items qui composent le RDD longueursLignes.

Question :

Séparez en mots la phrase « Stat Roma pristina nomine, nomina nuda tenemus » à l’aide d’une fonction anonyme.

Listes en compréhension (Sequence Comprehensions)

Les « listes en compréhension » sont des listes dont le contenu est défini par filtrage du contenu d’une autre liste, contrairement à l’approche plus classique de la construction de liste par énumération de ses éléments.

Scala propose une notation facile pour définir des listes par compréhension : for (enumerators) yield e, où enumerators est une liste de enumerators séparés par des « ; ». Un enumerator est soit un générateur qui introduit une ou plusieurs nouvelles variables, soit un filtre. Cette construction évalue l’expresssion e pour chaque association (binding) générée par les enumerators et retourne une séquence de ces valeurs.

Voici un exemple :

scala> def odd(from: Int, to: Int) = for (i <- from until to if i % 2 != 0) yield i
scala> println(odd(0, 20))

L’expression for dans la définition de la fonction odd introduit une nouvelle variable i: Int qui prend ensuite toutes les valeurs générées par l’itérateur _ until _ qui passent le test if i % 2 != 0 (ce test élimine les valeurs paires). Ici l’expression e de la syntaxe générale est réduite à la variable i. L’expression for retourne donc un tableau avec les nombres impaires entre from et to - 1.