Travaux pratiques - Manipulation de données numériques. Exécution d’applications

(la version précédente de cette séance, utilisant l’API RDD)

Références externes utiles :

Les exemples ci-dessous reprennent, en partie, ceux des documentations en ligne.

Données numériques dans MLlib

MLlib comporte deux interfaces de programmation (API) différentes, une plus ancienne (org.apache.spark.mllib) écrite pour travailler avec des RDD et une plus récente (org.apache.spark.ml) qui travaille avec les Dataset / DataFrame. Nous nous servirons principalement de l’API la plus récente.

Les deux API proposent des vecteurs et des matrices locales (stockés sur un seul nœud de calcul). L’API basée sur les RDD propose des matrices distribuées sous forme de RDD. Dans l’API basée sur les Dataset / DataFrame, ce sont des DataFrame ou des (groupes de) colonnes de DataFrame qui constituent les matrices distribuées. Mélanger les deux API doit être fait avec précaution.

Il faut noter que les opérations d’algèbre linéaire ne sont pas proposées explicitement mais sont obtenues en se basant sur les librairies Breeze et jblas. Vous pouvez trouver ici un bref exemple d’utilisation explicite.

Vecteurs locaux, matrices locales

Un vecteur local a des indices de type Int (dont les valeurs commencent à 0) et des valeurs de type Double. MLlib propose des représentations denses ou creuses pour les vecteurs. Un vecteur dense emploie un seul tableau (array) de valeurs. Pour les vecteurs creux, deux tableaux sont mis en correspondance, un d’indices et l’autre de valeurs. Nous examinons ici quelques définitions de vecteurs denses et de vecteurs creux. Pour les essayer, ouvrez une fenêtre terminal et entrez

$ mkdir tpnum
$ cd tpnum
$ spark-shell

Ensuite, dans spark-shell,

scala> import org.apache.spark.ml.linalg.Vectors

// Créer un vecteur dense (1.5, 0.0, 3.5)
scala> val vectDense = Vectors.dense(1.5, 0.0, 3.5)

// Créer un vecteur dense de dimension 3 avec toutes les composantes 0.0
scala> val vectZeros = Vectors.zeros(3)

// Créer un vecteur creux (1.5, 0.0, 3.5) en indiquant les indices et valeurs
//   correspondant aux composantes non nulles
scala> val vectCreux1 = Vectors.sparse(3, Array(0, 2), Array(1.5, 3.5))
// Créer un vecteur creux (1.5, 0.0, 3.5) en indiquant la séquence des
//   paires (indice, valeur) pour les composantes non nulles
scala> val vectCreux2 = Vectors.sparse(3, Seq((0, 1.5), (2, 3.5)))

// Accès à une valeur
scala> println(vectCreux1(1))
scala> println(vectCreux1(2))

// Test d'égalité entre (contenus des) vecteurs
scala> vectCreux1.equals(vectCreux2)
scala> vectCreux1.equals(vectDense)
scala> vectCreux1.equals(vectZeros)

// Taille des vecteurs
scala> vectDense.size    // la méthode size n'a pas d'arguments
scala> vectZeros.size
scala> vectCreux1.size
scala> vectCreux2.size

// Copie profonde d'un vecteur
scala> val vectCreux3 = vectCreux1.copy    // la méthode copy n'a pas d'arguments

Il est nécessaire d’importer explicitement org.apache.spark.ml.linalg.Vectors car Scala importe par défaut scala.collection.immutable.Vector.

Il est possible de calculer directement une norme pour un vecteur local, ainsi que le carré de la distance L2 entre deux vecteurs :

// Norme L1
scala> Vectors.norm(vectDense, 1)
scala> Vectors.norm(vectCreux1, 1)

// Norme L2
scala> Vectors.norm(vectDense, 2)
scala> Vectors.norm(vectCreux1, 2)

// Carré de la distance L2 entre deux vecteurs
scala> Vectors.sqdist(vectDense, vectZeros)
scala> Vectors.sqdist(vectDense, vectCreux1)

Une matrice locale a des indices de type Int dont la valeur commence à 0 pour les lignes et pour les colonnes, et des valeurs de type Double. MLlib propose des matrices locales denses, classe DenseMatrix, et creuses, classe SparseMatrix. Les valeurs des éléments d’une matrice dense sont gardées dans un tableau unidimensionnel en concaténant les colonnes successives de la matrice. Un exemple de création d’une matrice dense à 3 lignes et 2 colonnes :

scala> import org.apache.spark.ml.linalg.{Matrix, DenseMatrix, SparseMatrix}

// Créer une matrice dense ((1.2, 2.3), (3.4, 4.5), (5.6, 6.7))
scala> val matDense = new DenseMatrix(3, 2, Array(1.2, 3.4, 5.6, 2.3, 4.5, 6.7))

Les matrices creuses sont représentées dans un format Compressed Sparse Column (CSC).

Question :

Calculez dans un vecteur vectProduit le produit entre matDense et un vecteur dense de composantes (1.0, 2.0). Pour cela, cherchez la méthode à utiliser dans la documentation en ligne de l’API en Scala (entrez DenseMatrix dans l’espace de recherche, ensuite choisissez la bonne classe et examinez ses méthodes)).

Correction

scala> val vD = Vectors.dense(1.0, 2.0)
scala> val vectProduit = matDense.multiply(vD)  // ou matDense multiply vD

Il est souvent utile de construire explicitement un Dataset / DataFrame de petite taille pour tester une méthode de traitement. L’exemple ci-dessous montre cette possibilité :

scala> import org.apache.spark.ml.linalg.Vectors  // si ce n'est déjà fait

scala> val donnees = Array(Vectors.sparse(3, Seq((0, 1.0))), Vectors.dense(0.0, 1.0, 0.0), Vectors.dense(0.0, 0.0, 1.0))
scala> val donneesdf = spark.createDataFrame(donnees.map(Tuple1.apply)).toDF("vecteurs")
scala> donneesdf.show()

Lecture des données numériques

La création de Dataset / DataFrame à partir de fichiers contenant des données numériques peut être faite suivant plusieurs approches. Pour pouvoir examiner ces différentes approches nous chercherons d’abord quelques fichiers de données. Ouvrez une seconde fenêtre terminal et entrez les commandes suivantes :

$ cd tpnum
$ mkdir data
$ cd data
$ wget http://cedric.cnam.fr/vertigo/Cours/RCP216/docs/geysers.txt
$ wget http://cedric.cnam.fr/vertigo/Cours/RCP216/docs/geyser.txt
$ wget http://cedric.cnam.fr/vertigo/Cours/RCP216/docs/geyser.csv
$ wget https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_libsvm_data.txt --no-check-certificate

La première approche consiste à créer directement le Dataset / DataFrame à partir du fichier. Examinons d’abord cette approche pour un fichier de format CSV (comma separated values), geyser.csv, qui présente deux valeurs double par ligne, séparées par une virgule :

// Création directe de DataFrame à partir d'un fichier de données en format CSV
scala> val geysercsvdf = spark.read.format("csv").load("data/geyser.csv")

// examinons le DataFrame geysercsvdf obtenu
scala> geysercsvdf.printSchema()
scala> geysercsvdf.show(5)

Ce sont les méthodes format et load de DataFrameReader , accessibles à partir de SparkSession.read, qui sont employées ici.

Question :

Définissez un en-tête avec un nom pour chaque colonne dans le fichier CSV et indiquez que cet en-tête doit être pris en compte lors de la lecture.

Réponse

Il suffit de spécifier cela comme une option en insérant .option("header",true) dans l’ordre de lecture (par défaut c’est .option("header",false), il n’y a pas d’en-tête dans le fichier CSV) :

scala> val testdf = spark.read.format("csv").option("header",true).load("data/geyser.csv")

Question :

Toujours à l’aide des options, créez un DataFrame à partir du fichier “data/geyser.txt” dans lequel les séparateurs sont des espaces (” ”).

Réponse

L’option correspondante est .option(""sep"," ") dans l’ordre de lecture (par défaut c’est .option("sep",","), séparateur virgule) :

scala> val testdf = spark.read.format("csv").option("sep"," ").load("data/geyser.txt")

Un deuxième exemple suivant cette même approche concerne la lecture de vecteurs étiquettés (labeled point) dans un format popularisé par LIBSVM (le même format est utilisé par LIBLINEAR). C’est un format texte dans lequel chaque ligne représente un vecteur étiquetté creux dans le format suivant label index1:value1 index2:value2 ..., où les indices (>= 1) sont en ordre croissant. Après chargement, les indices sont convertis pour être >= 0. SparkSession.read.format("libsvm").load() permet la lecture de fichiers suivant ce format et la création d’un RDD contenant ces données :

// Création directe de DataFrame à partir d'un fichier de données en format LIBSVM
scala> val libsvmdf = spark.read.format("libsvm").load("data/sample_libsvm_data.txt")

// examinons le DataFrame libsvmdf obtenu
scala> libsvmdf.printSchema()
scala> libsvmdf.show(5)

// Calcul de statistiques pour la colonne « label »
scala> libsvmdf.describe("label").show()

Question : pourquoi il n’est pas possible d’obtenir avec .describe() des statistiques pour la colonne « features » ?

Il faut noter que parmi les autres formats directement accessibles avec SparkSession.read.format() on peut trouver json, text ou parquet.

Une seconde approche pour la création de Dataset / DataFrame à partir de fichiers contenant des données numériques consiste à passer par l’intermédiaire d’un RDD. D’abord, un RDD est obtenu à partir du fichier texte geysers.txt représentant, dans un format adapté à la fonction de lecture .loadVectors(), 272 observations (1 observation par ligne) pour 2 variables (chaque variable est représentée par une colonne) :

scala> import org.apache.spark.mllib.util.MLUtils
scala> import org.apache.spark.rdd.RDD

scala> val lignes = MLUtils.loadVectors(sc, "data/geysers.txt")
scala> lignes.count()

Ce RDD est ensuite utilisé pour obtenir un DataFrame :

scala> import org.apache.spark.sql.Row
scala> import org.apache.spark.sql.types._

// Définition du schéma du DataFrame comme une chaîne de caractères
scala> val chaineSchema = "val1 val2"

// Construction du schéma à partir de la chaîne de caractères
scala> val champs = chaineSchema.split(" ").map(nomChamp => StructField(nomChamp, DoubleType, true))
scala> val schema = StructType(champs)

// Construction d'un RDD[Row] à partir du RDD[Vector]
scala> val rowRDD = lignes.map(ligne => Row(ligne(0),ligne(1)))

// Construction du DataFrame à partir du RDD[Row]
scala> val lignesdf = spark.createDataFrame(rowRDD, schema)

// Examen du DataFrame obtenu
scala> lignesdf.printSchema()
scala> lignesdf.show(5)

// Calcul de statistiques pour les deux colonnes du DataFrame
scala> lignesdf.describe("val1").show()
scala> lignesdf.describe("val2").show()

Pour obtenir un DataFrame (le même) via un RDD à partir du fichier geyser.txt dont le format n’est pas adapté à la fonction de lecture loadVectors nous procédons de la manière suivante :

scala> import org.apache.spark.ml.linalg.Vectors

scala> val donnees = sc.textFile("data/geyser.txt")
scala> val lignes2 = donnees.map(s => Vectors.dense(s.split(' ').map(_.toDouble)))
scala> lignes2.count()

Question :

Expliquer les opérations réalisées pour obtenir le RDD lignes2.

Réponse

Le fichier geyser.txt est lu dans un RDD donnees de type texte, dont chaque élément (item) est une ligne de texte. Vous pouvez visualiser le contenu du fichier geyser.txt en entrant, dans une fenêtre terminal, more /home/cloudera/tpnum/data/geyser.txt. Ensuite, lors de la création du RDD lignes2, les opérations suivantes ont lieu : d’abord, chaque ligne de texte (chaque item de donnees), désignée implicitement par s, qui est une chaîne de caractères, est partitionnée en “mots” séparés par des espaces avec s.split(' ') pour produire un “tableau” (Array, non explicitement indiqué) de “mots” (chaînes de caractères). Chaque élément de ce tableau (chaque “mot”), correspondant à une séquence de caractères qui représente une valeur numérique (visualisez le fichier geyser.txt), est converti en cette valeur numérique en double précision par .map(_.toDouble). On remarque qu’une méthode map peut être appliquée aussi sur un Array (entre autres) et non seulement sur un RDD. On obtient ainsi un “tableau” (Array, non explicitement indiqué) à partir duquel est créé un vecteur dense par Vectors.dense(), qui sera un item du RDD lignes2.

La définition préalable du schéma du DataFrame permet de spécifier les formats des différentes colonnes grâce à la méthode DataFrameReader.schema(). Par exemple, avec le schema défini plus haut :

scala> val geysercsvdfd = spark.read.format("csv").schema(schema).load("data/geyser.csv")
scala> geysercsvdfd.printSchema()
scala> geysercsvdfd.show(5)

// Calcul de statistiques pour toutes les colonnes numériques
scala> geysercsvdfd.describe().show()

Le même résultat peut être obtenu dans cet exemple à l’aide de DataFrameReader.option("inferSchema",true), au prix d’une passe supplémentaire sur les données lues (ce qui est coûteux lorsque le volume de données est élevé).

scala> val testdfd = spark.read.format("csv").option("inferSchema",true).load("data/geyser.csv")
scala> testdfd.printSchema()
scala> testdfd.show(5)

Ecrire et exécuter une application dans Spark

Jusqu’ici nous avons utilisé Spark en transmettant, à travers spark-shell, une par une des instructions écrites en Scala. Ce mode de fonctionnement est adapté à l’exécution d’opérations simple et lors de l’étape de mise au point de programmes. Il est toutefois nécessaire de pouvoir écrire des programmes pour Spark (en Scala, dans le cadre de nos TP), de les corriger ou modifier éventuellement et de les lancer en exécution.

Ecriture de l’application

Nous ferons d’abord une copie locale d’un fichier HTML qui servira de source de données et préparerons la structure des répertoires nécessaires. Pour cela, dans une nouvelle fenêtre terminal entrez les commandes suivantes :

$ cd tpnum/data
$ wget http://cedric.cnam.fr/vertigo/Cours/RCP216/tpSparkScala.html
$ cd ..
$ mkdir -p src/main/scala

Lancez un éditeur de texte (KWrite par exemple, dans le menu Applications_pedagogiques, sous-menu Editeurs) et copiez dans cet éditeur le programme Scala suivant :

/* SimpleProg.scala */
import org.apache.spark.sql.SparkSession

object SimpleProg {
  def main(args: Array[String]) {
    val myFile = "data/tpSparkScala.html"
    val spark = SparkSession.builder.appName("Application Simple").getOrCreate()
    val myData = spark.read.textFile(myFile).cache()
    val nbclass = myData.filter(line => line.contains("class")).count()
    val nbjavascript = myData.filter(line => line.contains("javascript")).count()
    println("Lignes avec javascript : %s, lignes avec class : %s".format(nbjavascript, nbclass))
    spark.stop()
  }
}

Enregistrez-le dans un fichier SimpleProg.scala dans le répertoire ~/tpnum/src/main/scala. Ici ~/ indique votre répertoire $HOME (que vous pouvez connaître avec la commande linux echo $HOME).

Ce programme très simple compte le nombre de lignes du fichier tpSparkScala.html qui contiennent « javascript » et le nombre de lignes qui contiennent « class ». Lors de l’utilisation de spark-shell un objet SparkSession était créé automatiquement. Lorsqu’on écrit une application autonome il est nécessaire d’initialiser un objet SparkSession explicitement. Pour cela, il est nécessaire d’appeler SparkSession.builder.

Question : quel est l’intérêt de l’utilisation de .cache() pour le Dataset myData ?

Le programme dépendant de l’API Spark, nous préparerons aussi un programme de configuration pour SBT (Simple build Tool), simpleprog.sbt, qui indique les dépendances nécessaires. Pour cela, lancez l’éditeur de texte et copiez dans l’éditeur les lignes suivantes (attention, les lignes vides de séparation sont nécessaires !) :

name := "Simple Programme"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.0"

Il faut indiquer les versions de Scala et de Spark qui sont installées sur votre système (ici 2.11.8 et respectivement 2.2.0). Enregistrez ce texte dans un fichier simpleprog.sbt dans le répertoire ~/tpnum

Si SBT n’est pas présent sur votre système, il est nécessaire de l’installer (voir http://www.scala-sbt.org). SBT est déjà installé dans les salles de travaux pratiques au Cnam.

Il est maintenant possible de créer avec SBT le .jar contenant le programme. Dans une fenêtre terminal (shell), entrez les commandes suivantes :

$ cd tpnum
$ sbt package

Durant les env. 5 minutes nécessaires à SBT pour la création du premier .jar (ce sera plus rapide pour les suivants) vous pouvez poursuivre la lecture du support de TP.

Lancement en exécution avec spark-submit

Dès que SBT a terminé la création de simple-programme_2.11-1.0.jar (affichage de [info] Packaging {..}/{..}/target/scala-2.10/simple-programme_2.11-1.0.jar) il est possible de lancer en exécution l’application avec spark-submit. Le script spark-submit permet d’utiliser tous les cluster managers que Spark sait gérer, de façon transparente à l’application. Dans une fenêtre terminal (bash), dans le répertoire ~/tpnum, entrez :

$ spark-submit --class "SimpleProg" --master local target/scala-2.11/simple-programme_2.11-1.0.jar

si tout se passe bien, vous devriez obtenir la réponse

...
Lignes avec javascript : 5, lignes avec class : 231
Les options transmises à spark-submit sont :
  • --class : le point d’entrée pour l’application, ici SimpleProg ;
  • --master : l’URL du master sur le cluster, ici local qui demande l’exécution de l’application en local sur un seul cœur (local[2] : sur 2 cœurs, local[*]: sur tous les cœurs disponibles).

D’autres options sont souvent utilisées :

$ spark-submit \
    --class <main-class>
    --master <master-url> \
    --deploy-mode <deploy-mode> \
    --conf <key>=<value> \
    ... # autres options
    <application-jar> \
    [arguments de l'application]
avec
  • --deploy-mode : s’il faut faire tourner le driver sur un des nœuds du cluster (cluster) ou localement sur un client externe (client) (par défaut client) ;
  • --conf : propriétés de configuration de Spark en format “clé=valeur” ;
  • application-jar : chemin vers un .jar “empaqueté” (bundled) contenant l’application et toutes les dépendances et obtenu avec SBT ; l’URL doit être visible à partir de chaque nœud du cluster ;
  • application-arguments : si nécessaire, arguments à passer à la méthode main de l’application.

La totalité des options, dont certaines spécifiques au cluster manager employé, peuvent être obtenues avec spark-submit --help. Aussi, des informations détaillées sur le processus de lancement et sur l’exécution peuvent être obtenues en ajoutant l’option --verbose lors du lancement de spark-submit.

Des informations de configuration sont également présentes dans SPARK_HOME/conf/spark-defaults.conf (si le fichier spark-defaults.conf n’existe pas, il faut le créer à partir de spark-defaults.conf.template), mais sont moins prioritaires que les options indiquées en ligne de commande lors du lancement de spark-submit.