.. _chap-tpSparkScala: ################################################# Travaux pratiques - Introduction à Spark et Scala ################################################# .. only:: html .. container:: notebook .. image:: _static/zeppelin_classic_logo.png :class: svg-inline `Cahier Zeppelin `_ (`la version précédente de cette séance, utilisant l'API RDD `_) Références externes utiles : * `Documentation Spark `_ * `Documentation API Spark en Scala `_ * `Documentation Scala `_ **L'objectif** de cette première séance de TP est d'introduire l'interpréteur de commandes de Spark en langage Scala, quelques opérations de base sur les structures de données distribuées que sont les *DataFrame*, ainsi que quelques notions simples et indispensables concernant le langage Scala. Pour les séances de travaux pratiques, Spark est installé sur un système d'exploitation Linux. Si vous n'êtes pas familier avec les commandes Linux, il est utile de consulter au préalable `ce site `_. Les exemples ci-dessous reprennent, en partie, ceux des documentations en ligne de Spark. .. Dans la salle de travaux pratiques, lors du démarrage de l'ordinateur, choisissez la configuration ``Info-cloudera...``. CDH5 (Cloudera) est déjà installé et inclut Spark. Dans la suite, certaines commandes système sont spécifiques à cette configuration. Si vous installez Spark d'une autre façon, il vous faudra adapter les commandes système. .. tabs:: .. tab:: En salle TP Dans les salles de travaux pratiques (TP), lors du démarrage de l'ordinateur, choisissez la configuration ``openSUSE``. Après l'ouverture de session, ouvrez une fenêtre de type terminal (par exemple, Konsole ou Qterminal). Dans cette fenêtre vous pouvez entrer des commandes Linux qui sont traitées par un interpréteur de commandes spécifiques (``bash`` dans le cas particulier des salles de TP, terme générique ``shell``). Dans les salles, Spark est installé directement dans Linux (distribution ``openSUSE``) et, dans la suite, certaines commandes système sont spécifiques à cette configuration. Si vous installez Spark d'une autre façon (machine virtuelle ou machine personnelle), il sera peut-être nécessaire d'adapter les commandes système. .. tab:: Sur votre machine personnelle Si vous suivez ce cours en formation à distance ou si vous souhaitez travailler chez vous (par exemple pour réaliser le projet de cette unité d'enseignement), il vous sera nécessaire d'installer Spark sur un ordinateur auquel vous avez accès en permanence. Suivez `ces instructions d'\ installation de Spark `_ et signalez les éventuelles difficultés rencontrées (après avoir quand même cherché vous-même des solutions dans des forums sur le web). .. tab:: Avec Zeppelin Les séances de travaux pratiques avec Spark peuvent être réalisées en utilisant Apache Zeppelin, une interface web pour Spark. Pour ce faire vous pouvez télécharger le cahier (*notebook*) Zeppelin à partir du bandeau en haut du TP (clic droit > Enregistrer sous) puis importer le fichier `.json` obtenu dans Zeppelin. Si vous êtes inscrit au Cnam, vous avez accès au serveur Zeppelin RCP216 du département informatique: ``_ (identifiants Siscol). *************************************** Spark : concepts de base avec exemples *************************************** Il est possible d'utiliser Spark à partir des langages Java, Scala, Python ou (dans une moindre mesure) R * à travers une interface en ligne de commandes (en Scala ou Python), pratique aussi bien pour des tests interactifs que pour l'étape de mise au point d'une nouvelle application, * en écrivant (et en exécutant ensuite) un programme (voir la séance de TP suivante). Certaines librairies ayant été développées seulement en Scala, une application écrite en Java ou en Python doit les appeler pour employer leurs fonctionnalités. Dans ces séances de TP nous nous servirons exclusivement de Scala. Lancement de l'interpréteur de commandes en Scala et opérations simples ======================================================================= Pour ce premier TP, nous utiliserons l'interpréteur de commandes ``spark-shell``. Il s'agit d'une invite de commandes interactive permettant de communiquer directement avec un *cluster* Spark local. .. only:: jupyter .. warning:: **Attention, si vous avez ouvert ce TP dans Zeppelin, ne passez pas par le *notebook* pour le faire. Ouvrez une invite de commandes et lancez spark-shell. L'objectif est de se familiariser avec l'interpréteur. Nous utiliserons Zeppelin dans les TP suivants.** Nous allons travailler avec un fichier texte contenant la première du livre *Les Malheurs de Sophie* de la comtesse de Ségur, téléchargeable `ici <./sophie.txt>`_. Commençons par ouvrir un terminal (Windows ou MacOS/Linux). .. only:: html (**attention** : les commandes qui suivent ``%%bash`` sont à rentrer dans le terminal) .. tabs:: .. group-tab:: Linux .. code-block:: bash %%bash mkdir tpintro # Créé un dossier nommé tpintro cd tpintro # Se place dans le dossier tpintro wget https://cedric.cnam.fr/vertigo/Cours/RCP216/sophie.txt # wget permet de télécharger un fichier .. group-tab:: MacOS .. code-block:: bash %%bash mkdir tpintro # Créé un dossier nommé tpintro cd tpintro # Se place dans le dossier tpintro curl https://cedric.cnam.fr/vertigo/Cours/RCP216/sophie.txt -o sophie.txt # curl permet de télécharger un fichier .. group-tab:: Windows Téléchargez le fichier ``sophie.txt`` puis déplacez le dans un répertoire ``tpIntro``. En ligne de commande, déplacez-vous dans ce dossier (``cd chemin\vers\tpIntro\``). .. only:: not jupyter Lancez ensuite dans cette fenêtre l'interpréteur de commandes ``spark-shell`` en entrant .. code-block:: bash spark-shell Créez un *Dataset* à partir du fichier texte ``sophie.txt`` en entrant dans l'interpréteur ``spark-shell`` : .. code-block:: scala val texteSophie = spark.read.textFile("sophie.txt") .. note:: Si vous avez fait une faute de frappe ou de recopie et ne comprenez pas ce qui s'affiche dans la fenêtre, consultez le message d'erreur qui s'affiche afin de comprendre le problème. Les erreurs communes sont des fautes de frappe dans le nom des variables et des erreurs dans le syntaxe de Scala. En cas de blocage, vous pouvez appeler l'enseignant. ``spark`` est un objet ``SparkSession`` qui comporte un certain nombre d'informations de configuration (par ex., indique à Spark comment accéder à un *cluster* pour exécuter les commandes). Dans ce TP les commandes seront exécutées sur un seul nœud, qui est celui sur lequel vous entrez les commandes et sur lequel tourne le programme *driver* de Spark. Visualisez le fichier ``sophie.txt``. Pour cela, il faut ouvrir une seconde fenêtre terminal. Allez ensuite dans le répertoire où se trouve le fichier ``sophie.txt`` et examinez-le : .. tabs:: .. group-tab:: Linux/MacOS .. code-block:: bash %%bash cat tpintro/sophie.txt .. group-tab:: Windows .. code-block:: bash %%bash type tpintro/sophie.txt La commande ``cat`` (sous Windows, ``type``) permet d'afficher le contenu d'un fichier, en l'occurrence ici du texte. *Dataset* et *DataFrame* ======================== Un *Dataset* est une collection distribuée de données. Il peut être vu comme une évolution conceptuelle des RDD (*Resilient Distributed Datasets*), historiquement la première structure de données distribuée employée par Spark. Un *DataFrame* est un *Dataset* organisé en colonnes qui portent des noms, comme les tables d'une base de données. Avec l'interface de programmation en Scala, le type ``DataFrame`` est simplement l'alias du type ``Dataset[Row]``. Il est utile de lire `ces explications préparées par les créateurs de Spark `_ pour mieux comprendre l'intérêt de chacune des interfaces de programmation (API) *RDD* et *Dataset / Dataframe*. Il est possible d'appliquer aux *Datasets* des **actions**, qui produisent des valeurs, et des **transformations**, qui produisent de nouveaux *Datasets*, ainsi que certaines fonctions qui n'entrent dans aucune de ces deux catégories. Quelques **actions** simples : .. code-block:: scala texteSophie.count() // nombre de lignes dans ce Dataset .. code-block:: scala texteSophie.first() // première ligne de ce Dataset .. code-block:: scala texteSophie.take(5) // 5 premières lignes de ce Dataset .. code-block:: scala texteSophie.collect() // retourne une partie du contenu de ce Dataset .. admonition:: Question : À quoi correspond une ligne de ce *Dataset* ? .. ifconfig:: tpscala in ('public') .. admonition:: Correction Une ligne de ce *Dataset* correspond à une ligne du fichier ``sophie.txt``. On peut aisément le vérifier en affichant les premières lignes du *Dataset* : .. code-block:: scala texteSophie.show(5) L'exécution d'une action comme ``count`` sur un très grand *Dataset* se déroule de la manière suivante : le *Dataset* est composé de fragments, chacun stocké sur un nœud de calcul ; chaque fragment contient un (grand) nombre de lignes ; le programme *driver* de Spark envoie à chaque nœud le traitement à faire ; chaque nœud de calcul exécute le traitement sur le fragment local et transmet les résultats au *driver*. Pour d'autres actions (comme ``first``) seul un fragment est nécessaire donc un seul nœud est sollicité. Quelques **transformations** simples : * Construire un *Dataset* comprenant seulement les lignes de ``texteSophie`` qui contiennent « Poupée », retourner un tableau avec ses 2 premières lignes : .. code-block:: scala val lignesAvecPoupee = texteSophie.filter(line => line.contains("poupée")) lignesAvecPoupee.take(2) // tableau avec les 2 premières lignes de ce Dataset * Construire un *Dataset* composé des longueurs des lignes de ``texteSophie``, retourner un tableau avec ses 5 premières lignes : .. code-block:: scala val longueursLignes = texteSophie.map(l => l.length) longueursLignes.take(5) // tableau avec les 5 premières lignes de ce Dataset Dans ces exemples, les expressions ``l => l.contains("poupée")`` et ``l => l.length`` sont des définitions de « fonctions anonymes » (voir les éléments de Scala plus loin). Elles indiquent comment transformer chaque ligne du *Dataset* ; ``l`` est ainsi implicitement assimilé à une ligne générique du *Dataset*. Il est possible d'utiliser le nom de votre choix à la place de ``l``, comme ``ligne``, ``line``, ``x``, ``toto``, etc. L'exécution d'une transformation comme ``map`` sur un très grand *Dataset* se déroule de la manière suivante : le *Dataset* est composé de fragments, chacun stocké sur un nœud de calcul ; le programme *driver* de Spark envoie à chaque nœud le traitement à faire ; chaque nœud de calcul exécute le traitement sur le fragment local et conserve les résultats localement. **Enchaînements** de transformations et d'actions : .. admonition:: Question : Combien de lignes contiennent ``poupée`` ? Écrire la commande nécessaire dans ``spark-shell`` et vérifier dans un terminal (``bash``) Linux. .. ifconfig:: tpscala in ('public') .. admonition:: Correction .. code-block:: scala // .filter() permet de ne garder que les lignes vérifiant la condition désirée // .count() permet de conter le nombre de lignes texteSophie.filter(l => l.contains("poupée")).count() Vous pouvez vérifier ce résultat de différentes façons, par exemple en utilisant un éditeur de texte (KWrite sur les machines de TP) pour compter les lignes. Il est toutefois possible d'obtenir cette valeur directement depuis la ligne de commande : .. tabs:: Dans un terminal : .. group-tab:: Linux/MacOS .. code-block:: bash %%bash cd tpintro grep poupée sophie.txt | wc -l ``grep poupée sophie.txt`` renvoie les lignes du fichier d'entrée (ici ``sophie.txt`` envoyé par la commande précédente) qui contiennent le « motif » (*pattern*) ``poupée``. Le caractère ``|`` indique que la sortie de la commande qui précède ne doit pas être envoyée à la console mais transféré à la commande qui suit (c'est un enchaînement de commandes ou *pipe* Linux). Ces lignes sont donc envoyées vers la commande Linux ``wc -l`` qui affiche le nombre total de lignes du fichier reçu en entrée (ici, les lignes de ``sophie.txt`` contenant ``poupée``). .. group-tab:: Windows .. code-block:: bash %%bash cd tpintro findstr poupée sophie.txt | find /c /v "" ``findstr string file`` recherche la chaîne de caractères ``string`` dans le fichier ``file``. Le caractère ``|`` indique que la sortie de la commande qui précède ne doit pas être retourné à la console mais envoyé vers la commande qui suit (c'est un enchaînement de commandes ou *pipe*). ``find /c /v ""`` permet ensuite de compter le nombre de lignes du fichier reçu en entrée (ici, les lignes de ``sophie.txt`` contenant ``poupée``). .. admonition:: Question : Combien de caractères contient le fichier ? Utilisez un éditeur de texte ou bien la commande ``wc -m`` pour vérifier le résultat. Attention : la commande ``wc`` considère les sauts de ligne comme un caractère (mais pas Spark). .. ifconfig:: tpscala in ('public') .. admonition:: Correction .. code-block:: scala texteSophie.map(l => l.length).reduce((a, b) => a + b) **Important :** l'évaluation est « paresseuse » : les opérations sont effectuées seulement quand un résultat doit être retourné. Par exemple, dans la séquence .. code-block:: scala val texteSophie = spark.read.textFile("sophie.txt") val longueursLignes = texteSophie.map(l => l.length) longueursLignes.reduce((a, b) => a + b) les données ne sont pas chargées en mémoire après la première ligne et ``longueursLignes`` n'est pas construit immédiatement après la transformation ``map`` de la seconde ligne. Ce n'est qu'au moment où l'action ``reduce`` doit être exécutée que Spark partitionne les calculs à faire en tâches pour les différentes machines (et/ou cœurs) et chaque machine (et/ou cœur) exécute sa partie de ``map`` et de ``reduce``, avant de retourner la réponse au programme *driver* (qui contrôle l'exécution). En conséquence, lorsque ``spark-shell`` vous signale une erreur, il est possible que cette erreur ne provienne pas de la dernière instruction que vous avez écrite mais d'une instruction antérieure. Pour cela, lorsque vous mettez en place une chaîne de traitement en vous servant de ``spark-shell``, il est utile d'entrer de temps en temps des actions simples (comme ``longueursLignes.take(2)`` ici) pour identifier plus facilement les éventuelles erreurs. Examinons plus attentivement l'action ``reduce((a, b) => a + b)`` de l'exemple précédent. L'expression ``(a, b) => a + b`` est une définition de « fonction anonyme » (voir les éléments de Scala plus loin) qui indique comment transformer une paire de lignes du *Dataset* ; ``a`` et ``b`` sont ainsi implicitement assimilés à deux lignes génériques du *Dataset*. Il est possible d'utiliser le nom de votre choix à la place de ``a`` ou ``b``. Le déroulement de cette action est le suivant : chaque nœud, sur le fragment local du *Dataset*, remplace chaque paire de lignes par leur somme, ensuite applique de nouveau cette opération jusqu'à ce qu'il obtienne une seule valeur (qui sera dans ce cas la somme des valeurs de toutes les lignes) ; ces résultats sont transmis au *driver* qui les additionne pour obtenir la somme globale. .. admonition:: Note Si vous souhaitez supprimer les nombreuses lignes d'information qui précèdent la réponse (vous pouvez faire cela sur votre installation de Spark, pas sur celle de la salle de TP), dans le répertoire ``conf`` copiez ``log4j.properties.template`` en ``log4j.properties`` et, dans ce dernier fichier, ligne ``log4j.rootCategory=INFO``, remplacez ``INFO`` par ``WARN``. Dans une fenêtre terminal (``bash``), à ouvrir séparément de la fenêtre Spark, entrez les commandes : .. code-block:: bash cd /opt/spark/conf sudo cp log4j.properties.template log4j.properties sudo gedit log4j.properties Dans ``gedit``, ligne ``log4j.rootCategory=INFO``, remplacez ``INFO`` par ``WARN``. Pour rappel, ci-dessus ``$`` est le prompt système dans la fenêtre ``bash``. Actions ======= Dans les exemples précédents, ``count``, ``first``, ``take``, ``collect`` et ``reduce`` sont des exemples d'actions. Voici ci-dessous quelques actions parmi les plus utilisées. Consulter la documentation (`Scala `_) pour une liste complète et des spécifications détaillées. * ``reduce(func)`` | Agréger les éléments du *Dataset* en utilisant la fonction ``func`` (qui prend 2 arguments et retourne 1 résultat). La fonction devrait être associative et commutative pour être correctement calculée en parallèle. * ``collect()`` | Retourner toutes les lignes du *Dataset* comme un tableau au programme *driver*. À utiliser seulement si le *Dataset* a un volume faible (par ex., après des opérations de type ``filter`` **très** sélectives). * ``count()`` | Retourner le nombre de lignes du *Dataset*. * ``head(n)`` | Retourner un tableau avec les ``n`` premières lignes du *Dataset*. * ``take(n)`` | Alias pour ``head(n)``. * ``first()`` | Retourner la première ligne du *Dataset* (équivalent à ``take(1)`` ou ``head()``). * ``show(n)`` | Afficher les ``n`` premières lignes du *Dataset* sous forme de tableau. * ``foreach(func)`` | Appliquer la fonction ``func`` à toutes les lignes. Transformations et persistance ============================== Les transformations peuvent * produire un *Dataset* à partir d'un autre *Dataset* : ``map``, ``filter``, ``sort``, etc. * produire un *Dataset* à partir de deux *Dataset* : ``union``, ``join``, ``crossJoin``, etc. * produire 0 ou plusieurs *Dataset* à partir d'un *Dataset* : ``flatMap``. Un exemple avec ``flatMap`` : à partir de ``texteSophie``, obtenir un *Dataset* ``motstexteSophie`` ayant comme lignes les mots des différentes lignes de ``texteSophie`` et le rendre persistant pour faciliter des traitements ultérieurs. .. code-block:: scala texteSophie.flatMap(line => line.split(" ")).show() La méthode ``.split()`` permet de séparer une chaîne de caractères en une liste de mots séparés par un délimiteur. Ici, les lignes (du *Dataset* ``texteSophie``) sont divisées en mots séparés par des espaces. Il est possible d'utiliser plusieurs délimiteurs en les indiquant entre crochets (``[`` et ``]``), par exemple pour inclure séparer des mots accolés à une ponctuation. .. code-block:: scala val delimiteurs = "[ .;,?!--()_]" val motstexteSophie = texteSophie.flatMap(line => line.split(delimiteurs)) motstexteSophie.persist() ``motstexteSophie`` n'est pas calculé immédiatement (évaluation paresseuse) mais le sera au moment où un résultat devra être retourné. Avec ``.persist()`` (ou ``.cache()``), Spark cherchera à conserver ce *Dataset* en mémoire **une fois qu'il sera calculé**. .. code-block:: scala motstexteSophie.show() .. admonition:: Question : Comptez le nombre d'occurrences de chaque mot et affichez les 30 premières lignes résultantes. Indication pour la solution : ``.groupByKey(identity).count()`` permet de compter le nombre d'occurrences de chaque mot (les mots sont groupés par leur « identité », ensuite les occurrences comptées). .. ifconfig:: tpscala in ('public') .. admonition:: Correction .. code-block:: scala val motsOccurrences = motstexteSophie.groupByKey(identity).count() motsOccurrences.show(30) .. admonition:: Question : Retournez la valeur maximale et la moyenne du nombre de mots des lignes du fichier. .. ifconfig:: tpscala in ('public') .. admonition:: Correction .. code-block:: scala val nombresMotsParLignes = texteSophie.map(l => l.split(delimiteurs).size) nombresMotsParLignes.persist() // On cherche le nombre de mots maximum dans une ligne // À chaque comparaison entre la valeur la plus grande trouvée jusqu'ici et // la valeur actuelle, on garde le max des deux. val lMax = nombresMotsParLignes.reduce((a, b) => if (a > b) a else b) // On cherche le nombre moyen de mots. Pour ce faire, on somme le nombre de // mots de toutes les lignes pour obtenir le nombre total de mots puis on le // divise par le nombre de lignes (texteSophie.count()). val lAvg = nombresMotsParLignes.reduce((a, b) => a + b) / texteSophie.count() .. .. admonition:: Question : .. Triez les lignes en ordre alphabétique des mots, retournez les 10 premiers. .. .. ifconfig:: tpscala in ('public') .. .. admonition:: Correction .. .. code-block:: scala .. motstexteSophieUn.reduceByKey((a, b) => a + b).sortByKey(true).take(5) Bien entendu, il est possible d'enchaîner directement les opérations : .. code-block:: scala texteSophie.flatMap(line => line.split(delimiteurs)).groupByKey(identity).count().show(10) Noter que dans les fonctions anonymes comme ``line => line.split(" ")`` il est possible d'utiliser une autre notation pour les lignes des *Datasets* correspondants, par exemple ``a => a.split(" ")`` ; le mot « line » n'a pas de signification particulière pour le programme (il rappelle simplement au programmeur la signification des lignes de ces *Datasets*). Il est possible de décrire le schéma d'un *Dataset* (*DataFrame*) : .. code-block:: scala motstexteSophie.printSchema() Dans ce cas, une seule colonne est présente et se nomme « value » (nom par défaut). Cette colonne contient des valeurs de type *string* (chaînes de caractères, c'est-à-dire du texte). Il est possible d'obtenir un nouveau *DataFrame* comme résultat d'une requête SQL : .. code-block:: scala // Création d'une vue SQL temporaire "mots" motstexteSophie.createOrReplaceTempView("mots") // Application d'une requête SQL sur la vue temporaire val apresT = spark.sql("SELECT value FROM mots WHERE value > 't'") apresT.show() .. admonition :: Question : Que contient ``apresT`` ? .. ifconfig:: tpscala in ('public') .. admonition:: Correction ``apresT`` est un *DataFrame* contenant les mots qui viennent après la lettre `t` dans le dictionnaire (l'opérateur ``>`` entre des chaînes de caractères utilise l'ordre lexicographique). Voici maintenant quelques transformations parmi les plus utilisées. Consulter la documentation (`Scala `_) pour une liste complète et des spécifications détaillées. * ``map(func)`` | Retourne un nouveau *Dataset* obtenu en appliquant la fonction ``func`` à chaque ligne du *Dataset* de départ. * ``filter(func)`` | Retourne un nouveau *Dataset* obtenu en sélectionnant les lignes de la source pour lesquelles la fonction ``func`` retourne « vrai ». * ``flatMap(func)`` | Similaire à ``map`` mais chaque ligne du *Dataset* source peut être transformée en 0 ou plusieurs lignes ; retourne une séquence (``Seq``) plutôt qu'une seule ligne. | * ``sample(withReplacement,`` | Retourne un *Dataset* contenant une fraction aléatoire ``fraction`` du *Dataset* auquel la transformation s'applique, avec ou sans remplacement, avec une ``seed`` pour le générateur aléatoire. * ``union(otherDataset)`` | Retourne un *Dataset* qui est l'union des lignes du *Dataset* source et du *Dataset* argument (``otherDataset``). * ``intersection(otherDataset)`` | Retourne un *Dataset* qui est l'intersection des lignes du *Dataset* source et du *Dataset* argument (``otherDataset``). * ``distinct()`` | Retourne un *Dataset* qui est obtenu du *Dataset* source en éliminant les doublons des lignes. * ``groupByKey(func)`` | Pour un *Dataset*, calcule la clé par ``func`` et retourne un *KeyValueGroupedDataset*. * ``join(otherDataset, joinExprs,`` | Pour un *Dataset* jointure avec ``otherDataset`` de type ``joinType`` avec condition de jointure ``joinExprs``. * ``crossJoin(otherDataset)`` | Pour *Dataset* de type T et ``otherDataset`` de type U, retourne un *Dataset* de type (T, U) (produit cartésien). Création de *Dataset*, stockage de *Dataset* ============================================ Il y a plusieurs possibilités pour créer un *Dataset* : 1. À partir d'un RDD (*Resilient Distributed Dataset*) existant. 2. À partir de données externes. 3. Transformer un (ou plusieurs) *Dataset(s)* existant(s). `Plusieurs solutions `_ permettent d'obtenir un *Dataset / DataFrame* à partir d'un *RDD*. Le plus simple est de spécifier le schéma des données du *DataFrame* et ensuite d'appeler la méthode ``.createDataFrame(rowRDD, schema)`` de ``SparkSession``. Afin d'obtenir un *RDD* à partir d'un *Dataset / DataFrame* il suffit d'appeler la méthode ``.rdd`` de ``Dataset`` / ``DataFrame``. Spark permet de créer un *Dataset* à partir de toute source de données acceptée par Hadoop (fichier local, HDFS, HBase, etc.). Quelques exemples : * ``SparkSession.read.textFile()`` : lit le fichier donné en paramètre (sous forme d'URI) et construit un *Dataset* dont les lignes sont les lignes du fichier texte ; * ``SparkSession.readStream...`` : construction de *Datasets* à partir d'un flux de données. .. sources : voir https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader .. note:: Si l'URI correspond à un fichier local, il faut s'assurer que le fichier est accessible avec le même URI aux *workers*. Toutes les méthodes Spark de création de *Dataset* à partir de fichiers peuvent fonctionner sur des répertoires, des fichiers compressés et utiliser des *wildcards*. Par exemple : ``textFile("/my/directory")``, ``textFile("/my/directory/*.txt")`` et ``textFile("/my/directory/*.gz")``. Dans la configuration actuelle de Spark dans la salle de travaux pratiques, c'est le système de fichiers (*filesystem*) **local** et non HDFS (distribué) qui est utilisé par défaut. Si vous entrez simplement ``.textFile("sophie.txt")``, Spark cherchera le fichier dans le système de fichiers par défaut. Avec le préfixe ``file://`` on indique que le fichier à lire est sur le système de fichiers local, avec le préfixe ``hdfs://`` on indique qu'il s'agit plutôt de HDFS. L'utilisation d'un de ces préfixes exige l'emploi du chemin absolu vers le fichier correspondant. Un *Dataset* peut être sauvegardé en utilisant ``Dataset.write...``, par exemple ``Dataset.write.text(path)`` sous format texte. ``path`` indique un répertoire du système de fichiers local (préfixe ``file://``), HDFS (préfixe ``hdfs://``) ou autre fichier supporté par Hadoop. ************************************** Scala : concepts de base avec exemples ************************************** Scala est à la fois un langage objet pur, dans lequel chaque valeur est un objet (contrairement à Java), et un langage fonctionnel, chaque fonction étant également une valeur (donc un objet). Pour référence et un éventuel apprentissage plus approfondi du langage : * `Documentation Scala `_ * `Livres sur Scala `_ Fonctions anonymes ================== Les « fonctions anonymes » sont très utiles dans la définition d'opérations simples à appliquer aux données. Les expressions vues plus haut, comme ``(a, b) => a + b``, ``(a, b) => if (a > b) a else b`` ou ``l => l.split(" ").size`` sont des définitions de fonctions anonymes. Les deux premières fonctions ont deux arguments chacune, la troisième fonction a un seul argument. La définition ``(a, b) => a + b`` est, pour des arguments qui sont des ``Int``, une version courte de la définition suivante : .. code-block:: scala new Function1[Int, Int, Int] { def apply(x: Int, y: Int): Int = x + y } (c'est une définition formelle, non directement interprétable, ne l'entrez pas au clavier) Il est possible de définir des fonctions anonymes avec plus de deux paramètres, par exemple ``(x: Int, y: Int, z: Int) => x + y - z``. Une telle fonction peut être appelée directement : .. code-block:: scala ((x: Int, y: Int, z: Int) => x + y - z)(1,2,3) // exemple appel fonction anonyme Enfin, une fonction anonyme peut n'avoir aucun paramètre, par exemple ``() => { System.getProperty("user.dir") }``, qui peut être appelée directement : .. code-block:: scala (() => { System.getProperty("user.dir") })() // exemple appel fonction anonyme .. note:: Dans les expressions vues plus tôt, comme ``(a, b) => a + b`` employée par exemple dans ``longueursLignes.reduce((a, b) => a + b)``, le type des arguments n'était pas explicitement indiqué car il était donné par le type des lignes qui composent le *Dataset* ``longueursLignes``. .. admonition:: Question : Séparez en mots la phrase « Stat Roma pristina nomine, nomina nuda tenemus » à l'aide d'une fonction anonyme. .. ifconfig:: tpscala in ('private') .. only:: jupyter .. code-block:: scala .. ifconfig:: tpscala in ('public') .. admonition:: Correction On commence par construire une fonction anonyme qui prend une chaîne de caractères (``String``) en entrée et la découpe (méthode ``.split()``). .. code-block:: scala (l: String) => l.split(" ") Puis on applique cette fonction anonyme sur la phrase choisie : .. code-block:: scala ((l:String) => l.split(" "))("Stat Roma pristina nomine, nomina nuda tenemus") Listes en compréhension (*Sequence Comprehensions*) =================================================== Les « listes en compréhension » sont des listes dont le contenu est défini par filtrage du contenu d'une autre liste, contrairement à l'approche plus classique de la construction de liste par énumération de ses éléments. Scala propose une notation facile pour définir des listes par compréhension : ``for (enumerators) yield e``, où ``enumerators`` est une liste de *enumerators* séparés par des ";". Un *enumerator* est soit un générateur qui introduit une ou plusieurs nouvelles variables, soit un filtre. Cette construction évalue l'expression ``e`` pour chaque association (*binding*) générée par les *enumerators* et retourne une séquence de ces valeurs. Voici un exemple : .. code-block:: scala def odd(debut: Int, fin: Int) = for (i <- debut until fin if i % 2 != 0) yield i println(odd(0, 20)) L'expression ``for`` dans la définition de la fonction ``odd`` introduit une nouvelle variable ``i: Int`` qui prend ensuite toutes les valeurs générées par l'itérateur ``_ until _`` qui passent le test ``if i % 2 != 0`` (ce test élimine les valeurs paires). Ici l'expression ``e`` de la syntaxe générale est réduite à la variable ``i``. L'expression ``for`` retourne donc un tableau avec les nombres impairs entre ``debut`` et ``fin - 1``. .. admonition:: Question À l'aide d'une liste en compréhension, produire une liste des carrés des nombres entiers entre 5 et 15, **sauf 12**. .. ifconfig:: tpscala in ('private') .. only:: jupyter .. code-block:: scala .. ifconfig:: tpscala in ('public') .. admonition:: Correction Pas à pas, on peut commencer par produire tous les nombres entiers entre 5 et 15 : .. code-block:: scala for (i <- 5 until 16) yield i Il faut ensuite modifier le résultat du ``yield`` pour obtenir les carrés : .. code-block:: scala for (i <- 5 until 16) yield i * i On ajoute la condition pour éliminer le cas ``i == 12`` : .. code-block:: scala for (i <- 5 until 16 if i != 12) yield i * i