Travaux pratiques - Manipulation de données numériques. Exécution d’applications

(la nouvelle version de cette séance, utilisant l’API DataFrame)

Références externes utiles :

Les exemples ci-dessous reprennent, en partie, ceux des documentations en ligne.

Données numériques dans MLlib

MLlib propose des vecteurs locaux et matrices locales (stockés sur un seul nœud de calcul), ainsi que des matrices distribuées sous forme de RDD. Les opérations d’algèbre linéaire se basent sur les librairies Breeze et jblas.

Vecteurs locaux, matrices locales

Un vecteur local a des indices de type Int dont la valeur commence à 0 et des valeurs de type Double. MLlib propose des vecteurs denses (classe DenseVector) ou creux (classe SparseVector). Un vecteur dense emploie un seul tableau (array) de valeurs. Pour les vecteurs creux deux tableaux sont mis en correspondance, un d’indices et l’autre de valeurs. Voici quelques définitions de vecteurs denses et de vecteurs creux. Pour les essayer, ouvrez une fenêtre terminal et entrez

$ mkdir tpnum
$ cd tpnum
$ spark-shell

Ensuite, dans spark-shell,

scala> import org.apache.spark.mllib.linalg.Vectors

// Créer un vecteur dense (1.5, 0.0, 3.5)
scala> val vectDense = Vectors.dense(1.5, 0.0, 3.5)

// Créer un vecteur dense de dimension 100 avec toutes les composantes 0.0
scala> val vectZeros = Vectors.zeros(100)

// Créer un vecteur creux (1.5, 0.0, 3.5) en indiquant les indices et valeurs
//   correspondant aux composantes non nulles
scala> val vectCreux1 = Vectors.sparse(3, Array(0, 2), Array(1.5, 3.5))
// Créer un vecteur creux (1.5, 0.0, 3.5) en indiquant la séquence des
//   paires (indice, valeur) pour les composantes non nulles
scala> val vectCreux2 = Vectors.sparse(3, Seq((0, 1.5), (2, 3.5)))

// Accès à une valeur
scala> println(vectCreux1(1))
scala> println(vectCreux1(2))

// Test d'égalité entre (contenus des) vecteurs
scala> vectCreux1.equals(vectCreux2)
scala> vectCreux1.equals(vectDense)
scala> vectCreux1.equals(vectZeros)

// Taille des vecteurs
scala> vectDense.size    // la méthode size n'a pas d'arguments
scala> vectZeros.size
scala> vectCreux1.size
scala> vectCreux2.size

// Copie profonde d'un vecteur
scala> val vectCreux3 = vectCreux1.copy    // la méthode copy n'a pas d'arguments

Il est nécessaire d’importer explicitement org.apache.spark.mllib.linalg.Vectors car Scala importe par défaut scala.collection.immutable.Vector.

Un type particulier de vecteur est également proposé dans MLlib pour faciliter la représentation des données pour l’apprentissage supervisé : vecteur (ou point) étiquetté (labeled point). C’est un vecteur local, dense ou creux, associé à une réponse attendue représentée par un Double. Pour un problème régression la valeur attendue est représentée telle quelle par le Double, pour un problème de classification l’étiquette de classe est représentée par une valeur entière de ce Double : 0 ou 1 pour un problème à 2 classes, 0, 1, 2, … pour un problème à plus de 2 classes.

scala> import org.apache.spark.mllib.linalg.Vectors // si ce n'est déjà fait
scala> import org.apache.spark.mllib.regression.LabeledPoint

// Créer un point étiquetté avec une étiquette positive et un vecteur dense
scala> val pointPos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))

// Créer un point étiquetté avec une étiquette négative et un vecteur creux
scala> val pointNeg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))

Une matrice locale a des indices de type Int dont la valeur commence à 0 pour les lignes et pour les colonnes, et des valeurs de type Double. MLlib propose uniquement des matrices locales denses, classe DenseMatrix. Les valeurs des éléments d’une telle matrice sont gardées dans un tableau unidimensionnel en concaténant les colonnes successives de la matrice. Un exemple de création d’une matrice dense à 3 lignes et 2 colonnes :

scala> import org.apache.spark.mllib.linalg.{Matrix, Matrices, DenseMatrix}

// Créer une matrice dense ((1.2, 2.3), (3.4, 4.5), (5.6, 6.7))
scala> val matDense = new DenseMatrix(3, 2, Array(1.2, 3.4, 5.6, 2.3, 4.5, 6.7))

Question :

Calculez dans un vecteur vectProduit le produit entre matDense et un vecteur dense de composantes (1.0, 2.0). Pour cela, cherchez la méthode à utiliser dans la documentation en ligne de l’API en Scala (entrez DenseMatrix dans l’espace de recherche, ensuite choisissez la bonne classe et examinez ses méthodes)).

Lecture des données numériques

La lecture de données numériquees à partir de fichiers texte (directement lisibles par un humain), éventuellement écrits avec saveAsTextFile (voir la séance de TP précédente) peut être faite avec MLUtils.loadVectors ou MLUtils.loadLabeledPoints. Pour pouvoir utiliser ces fonctions nous chercherons d’abord quelques fichiers de données. Ouvrez une seconde fenêtre terminal et entrez les commandes suivantes :

$ cd tpnum
$ mkdir data
$ cd data
$ wget http://cedric.cnam.fr/~crucianm/src/geysers.txt
$ wget http://cedric.cnam.fr/~crucianm/src/geyser.txt
$ wget https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_libsvm_data.txt --no-check-certificate

Voici un exemple de création de RDD à partir d’un fichier texte représentant 272 observations (1 observation par ligne) pour 2 variables (une variable est représentée par une colonne) :

scala> import org.apache.spark.mllib.util.MLUtils
scala> import org.apache.spark.rdd.RDD

scala> val lignes = MLUtils.loadVectors(sc, "file:///home/cloudera/tpnum/data/geysers.txt")

Ce RDD sera utilisé dans la suite pour construire une matrice distribuée.

MLlib inclut des fonctions de lecture de vecteurs étiquettés (labeled point) dans un format spécifique à la librairie LIBSVM (le même format est utilisé par LIBLINEAR). C’est un format texte dans lequel chaque ligne représente un vecteur étiquetté creux dans le format suivant label index1:value1 index2:value2 ..., où les indices (>= 1) sont en ordre croissant. Après chargement, les indices sont convertis pour être plutôt >= 0. MLUtils.loadLibSVMFile permet la lecture de fichiers suivant ce format et la création d’un RDD contenant ces données :

scala> import org.apache.spark.mllib.regression.LabeledPoint
scala> import org.apache.spark.mllib.util.MLUtils
scala> import org.apache.spark.rdd.RDD

scala> val exemplesLibSVM: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "file:///home/cloudera/tpnum/data/sample_libsvm_data.txt")

Il n’est pas indispensable d’écrire : RDD[LabeledPoint] ci-dessus après val exemplesLibSVM, amis cela permet de voir directement le type de exemplesLibSVM. Nous ferons parfois cela dans la suite.

Il est également possible de sauvegarder un RDD[LabeledPoint] dans un format LIBSVM avec MLUtils.saveAsLibSVMFile.

Enfin, des fonctions de lecture spécifiques peuvent être écrites lorsque le format du fichier est spécifique. Par exemple, pour un fichier qui contient un vecteur par ligne, avec des valeurs numériques successives séparées par des espaces :

scala> import org.apache.spark.mllib.linalg.Vectors

scala> val donnees = sc.textFile("file:///home/cloudera/tpnum/data/geyser.txt")
scala> val lignes2 = donnees.map(s => Vectors.dense(s.split(' ').map(_.toDouble)))
scala> lignes2.count()

Question :

Expliquez le déroulement des opérations lors de la création du RDD lignes2.

Matrices distribuées

Une matrice distribuée a des indices de type Long dont la valeur commence à 0 pour les lignes et pour les colonnes, et des valeurs de type Double. MLlib propose trois types de matrices distribuées :
  • RowMatrix : matrice sans indices de lignes. Une RowMatrix a comme support un RDD de ses lignes, chaque ligne étant suffisamment courte pour être représentée par un vecteur local. Le nombre de lignes peut en revanche être très élevé.

  • IndexedRowMatrix : possède des indices des lignes, pour le reste est comme RowMatrix.

  • CoordinateMatrix : matrice qui utilise comme support un RDD de ses éléments ; format adapté aux matrices qui ont à la fois un grand nombre de lignes et un grand nombre de colonnes mais sont très creuses.

Voici quelques exemples :

scala> import org.apache.spark.mllib.linalg.Vector // si ce n'est déjà fait
scala> import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}

// Créer une RowMatrix à partir d'un RDD[Vector]
scala> val matLignes: RowMatrix = new RowMatrix(lignes)

// Déterminer la taille de la matrice
scala> val m = matLignes.numRows()
scala> val n = matLignes.numCols()

// Créer une IndexedRowMatrix à partir d'un RDD[IndexedRow]
scala> val idxRows: RDD[IndexedRow] = sc.parallelize(Seq((0L, Vectors.dense(0.0, 1.0, 2.0)),(1L, Vectors.dense(3.0, 4.0, 5.0)),
           (2L, Vectors.dense(6.0, 7.0, 8.0))).map(x => IndexedRow(x._1, x._2)))
scala> val matLignesIndexees: IndexedRowMatrix = new IndexedRowMatrix(idxRows)

// Déterminer la taille de la matrice
scala> val m = matLignesIndexees.numRows()
scala> val n = matLignesIndexees.numCols()

Il est, enfin, possible de créer une CoordinateMatrix à partir d’un RDD[MatrixEntry].

Opérations sur vecteurs et matrices

Pour les matrices distribuées RowMatrix et IndexedRowMatrix, MLlib propose la multiplication à droite par une matrice locale (dense).

Question :

Calculez dans une matrice matLignes2 le produit entre matLignes et la matrice dense ((1.0 0.0) (0.0 1.0)). Pour cela, cherchez la méthode à utiliser dans la documentation en ligne de l’API en Scala.

Plusieurs autres opérations sont également proposées pour une RowMatrix (format m x n, c’est à dire à m lignes et n colonnes), par exemple :
  • columnSimilarities(): CoordinateMatrix : calcul des similarités cosinus entre les n colonnes de la matrice ; retourne une matrice creuse triangulaire supérieure n x n.

  • computeColumnSummaryStatistics(): MultivariateStatisticalSummary : retourne, pour chaque colonne, le nombre d’éléments, la valeur maximale et minimale, la moyenne, la variance, les normes L1 et L2.

  • computeCovariance(): Matrix : retourne la matrice des covariances empiriques entre les variables (colonnes), en considérant que chaque ligne est une observation.

  • computePrincipalComponents(k: Int): Matrix : retourne dans une matrice locale n x k la projection des observations (lignes de la matrice) sur les k premières composantes principales, triées en ordre décroissant des valeurs propres ; l’analyse est appliquée directement, sans centrage (soustraction de la moyenne) ni réduction (division par l’écart-type) des variables (colonnes de la matrice initiale).

Un exemple d’utilisation :

// Matrice des variances et covariances
scala> matLignes.computeCovariance()

// Statistiques par colonne
scala> val matSummary = matLignes.computeColumnSummaryStatistics()
scala> matSummary.count     // nombre d'éléments des colonnes
scala> matSummary.max       // élément maximal pour chaque colonne
scala> matSummary.min       // élément minimal pour chaque colonne
scala> matSummary.mean      // moyenne des éléments par colonne
scala> matSummary.variance  // variance par colonne
scala> matSummary.normL1    // norme L1 par colonne
scala> matSummary.normL2    // norme L2 par colonne
scala> matSummary.numNonzeros  // nombre d'éléments non nuls par colonne

Question : calculez la matrice des variances et covariances pour matLignes2. Quel rapport a-t-elle avec celle obtenue pour matlignes ?

Nous examinerons de plus près computePrincipalComponents() lors du prochain TP.

Certaines de ces opérations ne sont pas directement disponibles pour IndexedRowMatrix, en revanche il est possible de passer par une conversion toRowMatrix afin d’accéder à toutes les opérations de RowMatrix :

scala> val matLignes2 = matLignesIndexees.toRowMatrix()

Aussi, la seule opération actuellement disponible sur les CoordinateMatrix est la conversion à une IndexRowMatrix dont les lignes sont des vecteurs creux (locaux !) : val matLignesIndexees2 = matCoordonnees.toIndexedRowMatrix().

Ecrire et exécuter une application dans Spark

Lors du premier TP nous avons utilisé Spark en transmettant, à travers spark-shell, une par une des instructions écrites en Scala. Ce mode de fonctionnement est adapté à l’exécution d’opérations simple et lors de l’étape de mise au point de programmes. Il est toutefois nécessaire de pouvoir écrire des programmes pour Spark (en Scala, dans le cadre de nos TP), de les corriger ou modifier éventuellement et de les lancer en exécution.

Ecriture de l’application

Nous ferons d’abord une copie locale d’un fichier HTML qui servira de source de données et préparerons la structure des répertoires nécessaires. Pour cela, dans une nouvelle fenêtre terminal entrez les commandes suivantes :

$ cd tpnum/data
$ wget http://cedric.cnam.fr/vertigo/Cours/RCP216/tpSparkScala.html
$ cd ..
$ mkdir -p src/main/scala

Lancez un éditeur de texte (KWrite par exemple, dans le menu Applications_pedagogiques, sous-menu Editeurs) et copiez dans cet éditeur le programme Scala suivant :

/* SimpleProg.scala */
org.apache.spark.sql.SparkSession

object SimpleProg {
  def main(args: Array[String]) {
    val myFile = "data/tpSparkScala.html"
    val spark = SparkSession.builder.appName("Application Simple").getOrCreate()
    val myData = spark.read.textFile(myFile).cache()
    val nbclass = myData.filter(line => line.contains("class")).count()
    val nbjavascript = myData.filter(line => line.contains("javascript")).count()
    println("Lignes avec javascript : %s, lignes avec class : %s".format(nbjavascript, nbclass))
    spark.stop()
  }
}

Enregistrez-le dans un fichier SimpleProg.scala dans le répertoire ~/tpnum/src/main/scala. Ici ~/ indique votre répertoire $HOME (que vous pouvez connaître avec la commande linux echo $HOME).

Ce programme très simple compte le nombre de lignes du fichier tpSparkScala.html qui contiennent « javascript » et le nombre de lignes qui contiennent « class ». Lors de l’utilisation de spark-shell un objet SparkSession était créé automatiquement. Lorsqu’on écrit une application autonome il est nécessaire d’initialiser un objet SparkSession explicitement. Pour cela, il est nécessaire d’appeler SparkSession.builder.

Question : quel est l’intérêt de l’utilisation de .cache() pour le Dataset myData ?

Le programme dépendant de l’API Spark, nous préparerons aussi un programme de configuration pour SBT, simpleprog.sbt, qui indique les dépendances nécessaires. Pour cela, lancez l’éditeur de texte et copiez dans l’éditeur les lignes suivantes (attention, les lignes vides de séparation sont nécessaires !) :

name := "Simple Programme"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.0"

Il faut indiquer les versions de Scala et de Spark qui sont installées sur votre système (ici 2.11.8 et respectivement 2.2.0). Enregistrez ce texte dans un fichier simpleprog.sbt dans le répertoire ~/tpnum

Si SBT n’est pas présent sur votre système, il est nécessaire de l’installer (voir http://www.scala-sbt.org). SBT est déjà installé dans les salles de travaux pratiques au Cnam.

Il est maintenant possible de créer avec SBT le .jar contenant le programme. Dans une fenêtre terminal (shell), entrez les commandes suivantes :

$ cd tpnum
$ sbt package

Durant les env. 20 minutes nécessaires à SBT pour la création du premier .jar (rassurez-vous, ce sera beaucoup plus rapide pour les suivants) vous pouvez poursuivre la lecture du support de TP.

Lancement en exécution avec spark-submit

Dès que SBT a terminé la création de simple-programme_2.11-1.0.jar (affichage de [info] Packaging {..}/{..}/target/scala-2.10/simple-programme_2.11-1.0.jar) il est possible de lancer en exécution l’application avec spark-submit. Le script spark-submit permet d’utiliser tous les cluster managers que Spark sait gérer, de façon transparente à l’application. Dans une fenêtre terminal (shell), entrez :

$ spark-submit --class "SimpleProg" --master local target/scala-2.11/simple-programme_2.11-1.0.jar

si tout se passe bien, vous devriez obtenir la réponse

...
Lignes avec javascript : 5, lignes avec class : 231
Les options transmises à spark-submit sont :
  • --class : le point d’entrée pour l’application, ici SimpleProg ;

  • --master : l’URL du master sur le cluster, ici local qui demande l’exécution de l’application en local sur un seul cœur (local[2] : sur 2 cœurs, local[*]: sur tous les cœurs disponibles).

D’autres options sont souvent utilisées :

$ spark-submit \
    --class <main-class>
    --master <master-url> \
    --deploy-mode <deploy-mode> \
    --conf <key>=<value> \
    ... # autres options
    <application-jar> \
    [arguments de l'application]
avec
  • --deploy-mode : s’il faut faire tourner le driver sur un des nœuds du cluster (cluster) ou localement sur un client externe (client) (par défaut client) ;

  • --conf : propriétés de configuration de Spark en format « clé=valeur » ;

  • application-jar : chemin vers un .jar « empaqueté » (bundled) contenant l’application et toutes les dépendances et obtenu avec SBT ; l’URL doit être visible à partir de chaque nœud du cluster ;

  • application-arguments : si nécessaire, arguments à passer à la méthode main de l’application.

La totalité des options, dont certaines spécifiques au cluster manager employé, peuvent être obtenues avec spark-submit --help. Aussi, des informations détaillées sur le processus de lancement et sur l’exécution peuvent être obtenues en ajoutant l’option --verbose lors du lancement de spark-submit.

Des informations de configuration sont également présentes dans SPARK_HOME/conf/spark-defaults.conf (si le fichier spark-defaults.conf n’existe pas, il faut le créer à partir de spark-defaults.conf.template), mais sont moins prioritaires que les options indiquées en ligne de commande lors du lancement de spark-submit.