Travaux pratiques - Manipulation de données numériques. Exécution d’applications

(la version précédente de cette séance, utilisant l’API RDD)

Références externes utiles :

L’objectif de cette séance de TP est d’introduire les structures de données locales et distribuées employées pour des données numériques, ainsi que la méthode permettant d’écrire des programmes pour Spark en langage Scala et de les lancer en exécution.

Les exemples ci-dessous reprennent, en partie, ceux des documentations en ligne.

Nous vous recommandons d’effectuer cette séance de travaux pratiques avec Apache Zeppelin afin de vous familiariser avec cet environnement de programmation pour Spark. Pour ce faire, téléchargez le cahier Zeppelin en cliquant sur le bandeau en haut du cours (clic droit > Enregistrer la cible du lien sous) puis importez ce fichier (Import note) dans Zeppelin.

Si vous êtes inscrit au Cnam, vous avez accès au serveur JupyterHub en passant par votre espace numérique de formation. Une fois sur la page du cours, cliquez sur le lien « Accès à JupyterHub ». Vous serez automatiquement authentifié sur votre espace Jupyter personnel hébergé sur les serveurs du Cnam. Votre répertoire personnel est préservé tant que vous êtes inscrit à au moins un cours.

Une fois sur cette interface, vous pouvez y importer la version Jupyter du TP que vous aurez préalblement téléchargée en cliquant sur le bandeau en haut de cette page. Lorsque cela vous est demandé, choisissez le noyau « Apache Toree - Scala ».

Données numériques dans MLlib

MLlib est la bibliothèque de Spark pour l’apprentissage automatique (machine learning). MLlib comporte deux interfaces de programmation (API) différentes, une plus ancienne (org.apache.spark.mllib) écrite pour travailler avec des RDD et une plus récente (org.apache.spark.ml) qui travaille avec les Dataset / DataFrame. Nous nous servirons principalement de l’API la plus récente, c’est-à-dire org.apache.spark.ml.

Les deux API proposent des vecteurs et des matrices locales (stockés sur un seul nœud de calcul). L’API basée sur les RDD propose des matrices distribuées sous forme de RDD. Dans l’API basée sur les Dataset / DataFrame, ce sont des DataFrame ou des (groupes de) colonnes de DataFrame qui constituent les matrices distribuées. Mélanger les deux API doit être fait avec précaution.

Il faut noter que les opérations d’algèbre linéaire ne sont pas proposées explicitement mais sont obtenues en se basant sur les librairies Breeze et jblas. Vous pouvez trouver ici un bref exemple d’utilisation explicite.

Vecteurs locaux, matrices locales

Un vecteur local a des indices de type Int (dont les valeurs commencent à 0) et des valeurs de type Double. MLlib propose des représentations denses ou creuses (sparse) pour les vecteurs. Un vecteur dense emploie un seul tableau (array) de valeurs. Pour les vecteurs creux, deux tableaux sont mis en correspondance, un d’indices et l’autre de valeurs. Nous examinons ici quelques définitions de vecteurs denses et de vecteurs creux. Pour les essayer, entrez

// Importe la bibliothèque de calcul vectoriel de Spark
import org.apache.spark.ml.linalg.Vectors

// Créer un vecteur dense (1.5, 0.0, 3.5)
val vectDense = Vectors.dense(1.5, 0.0, 3.5)

// Créer un vecteur dense de dimension 3 avec toutes les composantes 0.0
val vectZeros = Vectors.zeros(3)

// Créer un vecteur creux (1.5, 0.0, 3.5) en indiquant les indices et valeurs
//   correspondant aux composantes non nulles
val vectCreux1 = Vectors.sparse(3, Array(0, 2), Array(1.5, 3.5))
// Créer un vecteur creux (1.5, 0.0, 3.5) en indiquant la séquence des
//   paires (indice, valeur) pour les composantes non nulles
val vectCreux2 = Vectors.sparse(3, Seq((0, 1.5), (2, 3.5)))

Note

Les vecteurs denses stockent l’intégralité des valeurs dans un tableau. Si ce vecteur contient beaucoup de zéros, ce format est assez inefficace (il faut stocker n valeurs en mémoire pour un vecteur de longueur n).

Les vecteurs creux ne stockent en mémoire que les valeurs non-nulles et leurs indices. Par exemple le vecteur creux (0., 1.5, 3.0, 0., 0., 0., 0., 1.0, 0., 0., 0., 0.) est représenté par deux listes :

  • la liste des valeurs non-nulles (1.5, 3.0, 1.0),

  • la liste des positions des valeurs non-nulles (1, 2, 7) (les indices où se trouvent les valeurs).

Cela permet de ne stocker que 2k valeurs en mémoire (les éléments non-nuls et leurs indices) pour un vecteur de longueur n. Si un vecteur ou une matrice a plus de 50% d’éléments nuls, il est intéressant de les stocker au format creux.

On peut accéder aux éléments du vecteur en utilisant les parenthèses :

// Accès à une valeur
println(vectCreux1(1))
println(vectCreux1(2))

La méthode .equals() permet de tester l’égalité entre vecteurs, y compris représentés sous des formats différents.

// Test d'égalité entre (contenus des) vecteurs
println(vectCreux1.equals(vectCreux2))
println(vectCreux1.equals(vectDense))
println(vectCreux1.equals(vectZeros))

L’attribut .size permet d’accéder à la taille du vecteur.

// Taille des vecteurs
println(vectDense.size)    // la méthode size n'a pas d'arguments
println(vectZeros.size)
println(vectCreux1.size)
println(vectCreux2.size)

L’attribut .copy permet de copier un vecteur.

// Copie profonde d'un vecteur
val vectCreux3 = vectCreux1.copy    // la méthode copy n'a pas d'arguments

Il est nécessaire d’importer explicitement org.apache.spark.ml.linalg.Vectors car Scala importe par défaut scala.collection.immutable.Vector.

Il est possible de calculer directement une norme pour un vecteur local, ainsi que le carré de la distance L2 entre deux vecteurs :

// Norme L1
Vectors.norm(vectDense, 1)
Vectors.norm(vectCreux1, 1)
// Norme L2
Vectors.norm(vectDense, 2)
Vectors.norm(vectCreux1, 2)
// Carré de la distance L2 entre deux vecteurs
Vectors.sqdist(vectDense, vectZeros)
Vectors.sqdist(vectDense, vectCreux1)

Question

Calculer avec Spark la distance euclidienne (L2) entre les points (0, 1, 2) et (-2, -1, 0).

Une matrice locale a des indices de type Int dont la valeur commence à 0 pour les lignes et pour les colonnes, et des valeurs de type Double. MLlib propose des matrices locales denses, classe DenseMatrix, et creuses, classe SparseMatrix. Les valeurs des éléments d’une matrice dense sont gardées dans un tableau unidimensionnel en concaténant les colonnes successives de la matrice. Un exemple de création d’une matrice dense à 3 lignes et 2 colonnes :

import org.apache.spark.ml.linalg.{Matrix, DenseMatrix, SparseMatrix}

// Créer une matrice dense ((1.2, 2.3), (3.4, 4.5), (5.6, 6.7))
val matDense = new DenseMatrix(3, 2, Array(1.2, 3.4, 5.6, 2.3, 4.5, 6.7))

Les matrices creuses sont représentées dans un format Compressed Sparse Column (CSC).

Question :

Calculez dans un vecteur vectProduit le produit entre matDense et un vecteur dense de composantes (1.0, 2.0). Pour cela, cherchez la méthode à utiliser dans la documentation en ligne de l’API en Scala (entrez DenseMatrix dans l’espace de recherche, ensuite choisissez la bonne classe et examinez ses méthodes)).

Il est souvent utile de construire explicitement un Dataset / DataFrame de petite taille pour tester une méthode de traitement. L’exemple ci-dessous montre cette possibilité :

import org.apache.spark.ml.linalg.Vectors  // si ce n'est déjà fait

val donnees = Array(Vectors.sparse(3, Seq((0, 1.0))), Vectors.dense(0.0, 1.0, 0.0), Vectors.dense(0.0, 0.0, 1.0))
val donneesdf = spark.createDataFrame(donnees.map(Tuple1.apply)).toDF("vecteurs")
donneesdf.show()

Lecture des données numériques

La création de Dataset / DataFrame à partir de fichiers contenant des données numériques peut être faite suivant plusieurs approches. Pour pouvoir examiner ces différentes approches nous chercherons d’abord quelques fichiers de données.

Sous Linux/MacOS, ouvrez une seconde fenêtre terminal et entrez les commandes suivantes :

# À entrer dans un terminal
cd tpnum # entrer dans le répertoire tpnum
mkdir data && cd data  # créer un sous-dossier data et y rentrer
# Téléchargements des fichiers
wget -nc http://cedric.cnam.fr/vertigo/Cours/RCP216/docs/geysers.txt
wget -nc http://cedric.cnam.fr/vertigo/Cours/RCP216/docs/geyser.txt
wget -nc http://cedric.cnam.fr/vertigo/Cours/RCP216/docs/geyser.csv
wget -nc https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_libsvm_data.txt

Sous Windows, créez un répertoire tpnum\data puis téléchargez-y les fichiers dont les URL sont spécifiées ci-dessus.

La première approche consiste à créer directement le Dataset / DataFrame à partir du fichier. Examinons d’abord cette approche pour un fichier de format CSV (comma separated values), geyser.csv, qui présente deux valeurs double par ligne, séparées par une virgule :

// Création directe de DataFrame à partir d'un fichier de données en format CSV
val geysercsvdf = spark.read.format("csv").load("data/geyser.csv")

// examinons le DataFrame geysercsvdf obtenu
geysercsvdf.printSchema()
geysercsvdf.show(5)

Ce sont les méthodes format et load de DataFrameReader , accessibles à partir de SparkSession.read, qui sont employées ici.

Question :

Pourquoi les colonnes sont-elles de type string ? Avec les options de la fonction read, indiquez lors de la lecture via Spark que l’en-tête du fichier CSV doit être pris en compte pour nommer les colonnes du DataFrame.

Indice : se référer aux exemples de la documentation.

Question :

Toujours à l’aide des options, créez un DataFrame à partir du fichier « data/geyser.txt » dans lequel les séparateurs sont des espaces (« »).

La méthode .describe() permet de produire des statistiques sur les colonnes numériques du DataFrame :

geysercsvdf.describe().show()

Un deuxième exemple suivant cette même approche concerne la lecture de vecteurs étiquettés (labeled point) dans un format popularisé par LIBSVM (le même format est utilisé par LIBLINEAR). C’est un format texte dans lequel chaque ligne représente un vecteur étiquetté creux dans le format suivant label index1:value1 index2:value2 ..., où les indices (>= 1) sont en ordre croissant. Après chargement, les indices sont convertis pour être >= 0. SparkSession.read.format("libsvm").load() permet la lecture de fichiers suivant ce format et la création d’un DataFrame contenant ces données :

// Création directe de DataFrame à partir d'un fichier de données en format LIBSVM
val libsvmdf = spark.read.format("libsvm").load("data/sample_libsvm_data.txt")

// examinons le DataFrame libsvmdf obtenu
libsvmdf.printSchema()
libsvmdf.show(5)

// Calcul de statistiques pour la colonne « label »
libsvmdf.describe("label").show()

Question :

Pourquoi il n’est pas possible d’obtenir avec .describe() des statistiques pour la colonne « features » ?

Il faut noter que parmi les autres formats directement accessibles avec SparkSession.read.format() on peut trouver json, text ou parquet.

Une seconde approche pour la création de Dataset / DataFrame à partir de fichiers contenant des données numériques consiste à passer par l’intermédiaire d’un RDD. D’abord, un RDD est obtenu à partir du fichier texte geysers.txt représentant, dans un format adapté à la fonction de lecture .loadVectors(), 272 observations (1 observation par ligne) pour 2 variables (chaque variable est représentée par une colonne) :

import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD

val lignes = MLUtils.loadVectors(sc, "data/geysers.txt")
lignes.count()

Ce RDD est ensuite utilisé pour obtenir un DataFrame :

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

// Définition du schéma du DataFrame comme une chaîne de caractères
val chaineSchema = "duration interval"

// Construction du schéma à partir de la chaîne de caractères
val champs = chaineSchema.split(" ").map(nomChamp => StructField(nomChamp, DoubleType, true))
val schema = StructType(champs)

// Construction d'un RDD[Row] à partir du RDD[Vector]
val rowRDD = lignes.map(ligne => Row(ligne(0),ligne(1)))

// Construction du DataFrame à partir du RDD[Row]
val lignesdf = spark.createDataFrame(rowRDD, schema)

// Examen du DataFrame obtenu
lignesdf.printSchema()
lignesdf.show(5)

// Calcul de statistiques pour les deux colonnes du DataFrame
lignesdf.describe("duration").show()
lignesdf.describe("interval").show()

Pour obtenir un DataFrame (le même) via un RDD à partir du fichier geyser.txt dont le format n’est pas adapté à la fonction de lecture loadVectors nous procédons de la manière suivante :

import org.apache.spark.ml.linalg.Vectors

val donnees = sc.textFile("data/geyser.txt")
val lignes2 = donnees.map(s => Vectors.dense(s.split(' ').map(_.toDouble)))
lignes2.count()

Question :

Expliquer les opérations réalisées pour obtenir le RDD lignes2.

La définition préalable du schéma du DataFrame permet de spécifier les formats des différentes colonnes grâce à la méthode DataFrameReader.schema(). Par exemple, avec le schema défini plus haut :

val geysercsvdfd = spark.read.format("csv").schema(schema).load("data/geyser.csv")
geysercsvdfd.printSchema()
geysercsvdfd.show(5)

// Calcul de statistiques pour toutes les colonnes numériques
geysercsvdfd.describe().show()

Le même résultat peut être obtenu dans cet exemple à l’aide de DataFrameReader.option("inferSchema",true), au prix d’une passe supplémentaire sur les données lues (ce qui est coûteux lorsque le volume de données est élevé).

val testdfd = spark.read.format("csv").option("inferSchema",true).load("data/geyser.csv")
testdfd.printSchema()
testdfd.show(5)

Écrire et exécuter une application dans Spark

Jusqu’ici nous avons utilisé Spark en transmettant une par une des instructions écrites en Scala à travers spark-shell ou Jupyter. Ce mode de fonctionnement est adapté à l’exécution d’opérations simple et lors de l’étape de mise au point de programmes. Il est toutefois nécessaire de pouvoir écrire des programmes pour Spark (en Scala, dans le cadre de nos TP), de les corriger ou modifier éventuellement et de les lancer en exécution.

Écriture de l’application

Nous ferons d’abord une copie locale d’un fichier HTML qui servira de source de données et préparerons la structure des répertoires nécessaires. Pour cela, dans une nouvelle fenêtre terminal entrez les commandes suivantes :

mkdir -p tpnum && cd tpnum/
mkdir -p data/ && cd data/
wget http://cedric.cnam.fr/vertigo/Cours/RCP216/tpSparkScala.html
cd ..
mkdir -p src/main/scala

Sous Windows, créez le répertoire data et téléchargez-y la page HTML spécifiée (par exemple en la sauvegardant depuis un navigateur), puis créez l’arborescence src/main/scala dans le répertoire tpnum.

Lancez un éditeur de texte (KWrite par exemple, dans le menu Applications_pedagogiques, sous-menu Editeurs) et copiez dans cet éditeur le programme Scala suivant :

/* SimpleProg.scala */
import org.apache.spark.sql.SparkSession

object SimpleProg {
  def main(args: Array[String]) {
    val myFile = "data/tpSparkScala.html"
    val spark = SparkSession.builder.appName("Application Simple").getOrCreate()
    val myData = spark.read.textFile(myFile).cache()
    val nbclass = myData.filter(line => line.contains("class")).count()
    val nbjavascript = myData.filter(line => line.contains("javascript")).count()
    println("Lignes avec javascript : %s, lignes avec class : %s".format(nbjavascript, nbclass))
    spark.stop()
  }
}

Enregistrez-le dans un fichier SimpleProg.scala dans le répertoire ~/tpnum/src/main/scala. Ici ~/ indique votre répertoire utilisateur ($HOME, que vous pouvez connaître avec la commande Linux echo $HOME).

Ce programme très simple compte le nombre de lignes du fichier tpSparkScala.html qui contiennent « javascript » et le nombre de lignes qui contiennent « class ». Lors de l’utilisation de spark-shell un objet SparkSession était créé automatiquement. Lorsqu’on écrit une application autonome il est nécessaire d’initialiser un objet SparkSession explicitement. Pour cela, il est nécessaire d’appeler SparkSession.builder.

Question :

Quel est l’intérêt de l’utilisation de .cache() pour le Dataset myData ?

Le programme dépendant de l’API Spark, nous préparerons aussi un programme de configuration pour SBT (Simple build Tool), simpleprog.sbt, qui indique les dépendances nécessaires. Pour cela, lancez l’éditeur de texte et copiez dans l’éditeur les lignes suivantes (attention, les lignes vides de séparation sont nécessaires !) :

/* Le nom de votre choix pour votre programme */
name := "Simple Programme"

/* Le numéro de version de votre choix pour votre programme */
version := "1.0"

/* La version de Scala installée sur la machine */
scalaVersion := "2.11.8"

/* Les dépendences à utiliser (ici, Apache Spark dans la version installée sur la machine) */
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.0"

Il faut indiquer les versions de Scala et de Spark qui sont installées sur votre système (ici 2.11.8 et respectivement 2.2.0). Enregistrez ce texte dans un fichier simpleprog.sbt dans le répertoire ~/tpnum. Le nom « Simple Programme » et la version de votre programme, ainsi que la version de Scala utlisée sont employés par SBT pour générer le nom du fichier .jar qui sera, dans le cas présent, simple-programme_2.11-1.0.jar.

Si SBT n’est pas présent sur votre système, il est nécessaire de l’installer (voir http://www.scala-sbt.org). SBT est déjà installé dans les salles de travaux pratiques au Cnam.

Il est maintenant possible de créer avec SBT le .jar contenant le programme. Dans une fenêtre terminal (shell), entrez les commandes suivantes :

cd ~/tpnum
sbt package

Si la commande sbt n’est pas trouvée, sur les ordinateurs de la salle de TP essayez alors plutôt /opt/sbt/bin/sbt package.

Durant les env. 5 minutes nécessaires à SBT pour la création du premier .jar (ce sera plus rapide pour les suivants) vous pouvez poursuivre la lecture du support de TP.

Si jamais vous obtenez une erreur du type ResolveException: unresolved dependency, c’est que vous n’avez pas spécifié la bonne version pour Scala ou pour Apache Spark dans le fichier de configuration simpleprog.sbt.

Lancement en exécution avec spark-submit

Dès que SBT a terminé la création de simple-programme_2.11-1.0.jar (affichage de [info] Packaging {..}/{..}/target/scala-2.10/simple-programme_2.11-1.0.jar) il est possible de lancer en exécution l’application avec spark-submit. Le script spark-submit permet d’utiliser tous les cluster managers que Spark sait gérer, de façon transparente à l’application. Dans une fenêtre terminal (bash), dans le répertoire ~/tpnum, entrez :

$ spark-submit --class "SimpleProg" --master local target/scala-2.11/simple-programme_2.11-1.0.jar

Si tout se passe bien, vous devriez obtenir la réponse suivante :

...
Lignes avec javascript : 1, lignes avec class : 248
...

Les options transmises à spark-submit sont :

  • --class : le point d’entrée pour l’application, ici SimpleProg ;

  • --master : l’URL du master sur le cluster, ici local qui demande l’exécution de l’application en local sur un seul cœur (local[2] : sur 2 cœurs, local[*]: sur tous les cœurs disponibles).

D’autres options sont souvent utilisées :

spark-submit \
    --class <main-class>
    --master <master-url> \
    --deploy-mode <deploy-mode> \
    --conf <key>=<value> \
    ... # autres options
    <application-jar> \
    [arguments de l'application]

avec

  • --deploy-mode : s’il faut faire tourner le driver sur un des nœuds du cluster (cluster) ou localement sur un client externe (client) (par défaut client) ;

  • --conf : propriétés de configuration de Spark en format « clé=valeur » ;

  • application-jar : chemin vers un .jar « empaqueté » (bundled) contenant l’application et toutes les dépendances et obtenu avec SBT ; l’URL doit être visible à partir de chaque nœud du cluster ;

  • application-arguments : si nécessaire, arguments à passer à la méthode main de l’application.

La totalité des options, dont certaines spécifiques au cluster manager employé, peuvent être obtenues avec spark-submit --help. Aussi, des informations détaillées sur le processus de lancement et sur l’exécution peuvent être obtenues en ajoutant l’option --verbose lors du lancement de spark-submit.

Des informations de configuration sont également présentes dans SPARK_HOME/conf/spark-defaults.conf (si le fichier spark-defaults.conf n’existe pas, il faut le créer à partir de spark-defaults.conf.template), mais sont moins prioritaires que les options indiquées en ligne de commande lors du lancement de spark-submit.