Travaux pratiques - Manipulation de données numériques. Exécution d’applications¶
(la nouvelle version de cette séance, utilisant l’API DataFrame)
Références externes utiles :
Les exemples ci-dessous reprennent, en partie, ceux des documentations en ligne.
Données numériques dans MLlib¶
MLlib propose des vecteurs locaux et matrices locales (stockés sur un seul nœud de calcul), ainsi que des matrices distribuées sous forme de RDD. Les opérations d’algèbre linéaire se basent sur les librairies Breeze
et jblas
.
Vecteurs locaux, matrices locales¶
Un vecteur local a des indices de type Int
dont la valeur commence à 0 et des valeurs de type Double
. MLlib propose des vecteurs denses (classe DenseVector
) ou creux (classe SparseVector
). Un vecteur dense emploie un seul tableau (array) de valeurs. Pour les vecteurs creux deux tableaux sont mis en correspondance, un d’indices et l’autre de valeurs. Voici quelques définitions de vecteurs denses et de vecteurs creux. Pour les essayer, ouvrez une fenêtre terminal et entrez
$ mkdir tpnum $ cd tpnum $ spark-shell
Ensuite, dans spark-shell
,
scala> import org.apache.spark.mllib.linalg.Vectors // Créer un vecteur dense (1.5, 0.0, 3.5) scala> val vectDense = Vectors.dense(1.5, 0.0, 3.5) // Créer un vecteur dense de dimension 100 avec toutes les composantes 0.0 scala> val vectZeros = Vectors.zeros(100) // Créer un vecteur creux (1.5, 0.0, 3.5) en indiquant les indices et valeurs // correspondant aux composantes non nulles scala> val vectCreux1 = Vectors.sparse(3, Array(0, 2), Array(1.5, 3.5)) // Créer un vecteur creux (1.5, 0.0, 3.5) en indiquant la séquence des // paires (indice, valeur) pour les composantes non nulles scala> val vectCreux2 = Vectors.sparse(3, Seq((0, 1.5), (2, 3.5))) // Accès à une valeur scala> println(vectCreux1(1)) scala> println(vectCreux1(2)) // Test d'égalité entre (contenus des) vecteurs scala> vectCreux1.equals(vectCreux2) scala> vectCreux1.equals(vectDense) scala> vectCreux1.equals(vectZeros) // Taille des vecteurs scala> vectDense.size // la méthode size n'a pas d'arguments scala> vectZeros.size scala> vectCreux1.size scala> vectCreux2.size // Copie profonde d'un vecteur scala> val vectCreux3 = vectCreux1.copy // la méthode copy n'a pas d'arguments
Il est nécessaire d’importer explicitement org.apache.spark.mllib.linalg.Vectors
car Scala importe par défaut scala.collection.immutable.Vector
.
Un type particulier de vecteur est également proposé dans MLlib pour faciliter la représentation des données pour l’apprentissage supervisé : vecteur (ou point) étiquetté (labeled point). C’est un vecteur local, dense ou creux, associé à une réponse attendue représentée par un Double
. Pour un problème régression la valeur attendue est représentée telle quelle par le Double
, pour un problème de classification l’étiquette de classe est représentée par une valeur entière de ce Double
: 0 ou 1 pour un problème à 2 classes, 0, 1, 2, … pour un problème à plus de 2 classes.
scala> import org.apache.spark.mllib.linalg.Vectors // si ce n'est déjà fait scala> import org.apache.spark.mllib.regression.LabeledPoint // Créer un point étiquetté avec une étiquette positive et un vecteur dense scala> val pointPos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)) // Créer un point étiquetté avec une étiquette négative et un vecteur creux scala> val pointNeg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
Une matrice locale a des indices de type Int
dont la valeur commence à 0 pour les lignes et pour les colonnes, et des valeurs de type Double
. MLlib propose uniquement des matrices locales denses, classe DenseMatrix
. Les valeurs des éléments d’une telle matrice sont gardées dans un tableau unidimensionnel en concaténant les colonnes successives de la matrice. Un exemple de création d’une matrice dense à 3 lignes et 2 colonnes :
scala> import org.apache.spark.mllib.linalg.{Matrix, Matrices, DenseMatrix} // Créer une matrice dense ((1.2, 2.3), (3.4, 4.5), (5.6, 6.7)) scala> val matDense = new DenseMatrix(3, 2, Array(1.2, 3.4, 5.6, 2.3, 4.5, 6.7))
Question :
Calculez dans un vecteur vectProduit
le produit entre matDense
et un vecteur dense de composantes (1.0, 2.0). Pour cela, cherchez la méthode à utiliser dans la documentation en ligne de l’API en Scala (entrez DenseMatrix
dans l’espace de recherche, ensuite choisissez la bonne classe et examinez ses méthodes)).
Lecture des données numériques¶
La lecture de données numériquees à partir de fichiers texte (directement lisibles par un humain), éventuellement écrits avec saveAsTextFile
(voir la séance de TP précédente) peut être faite avec MLUtils.loadVectors
ou MLUtils.loadLabeledPoints
. Pour pouvoir utiliser ces fonctions nous chercherons d’abord quelques fichiers de données. Ouvrez une seconde fenêtre terminal et entrez les commandes suivantes :
$ cd tpnum $ mkdir data $ cd data $ wget http://cedric.cnam.fr/~crucianm/src/geysers.txt $ wget http://cedric.cnam.fr/~crucianm/src/geyser.txt $ wget https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_libsvm_data.txt --no-check-certificate
Voici un exemple de création de RDD à partir d’un fichier texte représentant 272 observations (1 observation par ligne) pour 2 variables (une variable est représentée par une colonne) :
scala> import org.apache.spark.mllib.util.MLUtils scala> import org.apache.spark.rdd.RDD scala> val lignes = MLUtils.loadVectors(sc, "file:///home/cloudera/tpnum/data/geysers.txt")
Ce RDD sera utilisé dans la suite pour construire une matrice distribuée.
MLlib inclut des fonctions de lecture de vecteurs étiquettés (labeled point) dans un format spécifique à la librairie LIBSVM (le même format est utilisé par LIBLINEAR). C’est un format texte dans lequel chaque ligne représente un vecteur étiquetté creux dans le format suivant label index1:value1 index2:value2 ...
, où les indices (>= 1) sont en ordre croissant. Après chargement, les indices sont convertis pour être plutôt >= 0. MLUtils.loadLibSVMFile
permet la lecture de fichiers suivant ce format et la création d’un RDD contenant ces données :
scala> import org.apache.spark.mllib.regression.LabeledPoint scala> import org.apache.spark.mllib.util.MLUtils scala> import org.apache.spark.rdd.RDD scala> val exemplesLibSVM: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "file:///home/cloudera/tpnum/data/sample_libsvm_data.txt")
Il n’est pas indispensable d’écrire : RDD[LabeledPoint]
ci-dessus après val exemplesLibSVM
, amis cela permet de voir directement le type de exemplesLibSVM
. Nous ferons parfois cela dans la suite.
Il est également possible de sauvegarder un RDD[LabeledPoint]
dans un format LIBSVM avec MLUtils.saveAsLibSVMFile
.
Enfin, des fonctions de lecture spécifiques peuvent être écrites lorsque le format du fichier est spécifique. Par exemple, pour un fichier qui contient un vecteur par ligne, avec des valeurs numériques successives séparées par des espaces :
scala> import org.apache.spark.mllib.linalg.Vectors scala> val donnees = sc.textFile("file:///home/cloudera/tpnum/data/geyser.txt") scala> val lignes2 = donnees.map(s => Vectors.dense(s.split(' ').map(_.toDouble))) scala> lignes2.count()
Question :
Expliquez le déroulement des opérations lors de la création du RDD lignes2
.
Matrices distribuées¶
- Une matrice distribuée a des indices de type
Long
dont la valeur commence à 0 pour les lignes et pour les colonnes, et des valeurs de typeDouble
. MLlib propose trois types de matrices distribuées : RowMatrix
: matrice sans indices de lignes. UneRowMatrix
a comme support un RDD de ses lignes, chaque ligne étant suffisamment courte pour être représentée par un vecteur local. Le nombre de lignes peut en revanche être très élevé.IndexedRowMatrix
: possède des indices des lignes, pour le reste est commeRowMatrix
.CoordinateMatrix
: matrice qui utilise comme support un RDD de ses éléments ; format adapté aux matrices qui ont à la fois un grand nombre de lignes et un grand nombre de colonnes mais sont très creuses.
Voici quelques exemples :
scala> import org.apache.spark.mllib.linalg.Vector // si ce n'est déjà fait scala> import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix} // Créer une RowMatrix à partir d'un RDD[Vector] scala> val matLignes: RowMatrix = new RowMatrix(lignes) // Déterminer la taille de la matrice scala> val m = matLignes.numRows() scala> val n = matLignes.numCols() // Créer une IndexedRowMatrix à partir d'un RDD[IndexedRow] scala> val idxRows: RDD[IndexedRow] = sc.parallelize(Seq((0L, Vectors.dense(0.0, 1.0, 2.0)),(1L, Vectors.dense(3.0, 4.0, 5.0)), (2L, Vectors.dense(6.0, 7.0, 8.0))).map(x => IndexedRow(x._1, x._2))) scala> val matLignesIndexees: IndexedRowMatrix = new IndexedRowMatrix(idxRows) // Déterminer la taille de la matrice scala> val m = matLignesIndexees.numRows() scala> val n = matLignesIndexees.numCols()
Il est, enfin, possible de créer une CoordinateMatrix
à partir d’un RDD[MatrixEntry]
.
Opérations sur vecteurs et matrices¶
Pour les matrices distribuées RowMatrix
et IndexedRowMatrix
, MLlib propose la multiplication à droite par une matrice locale (dense).
Question :
Calculez dans une matrice matLignes2
le produit entre matLignes
et la matrice dense ((1.0 0.0) (0.0 1.0)). Pour cela, cherchez la méthode à utiliser dans la documentation en ligne de l’API en Scala.
- Plusieurs autres opérations sont également proposées pour une
RowMatrix
(format m x n, c’est à dire à m lignes et n colonnes), par exemple : columnSimilarities(): CoordinateMatrix
: calcul des similarités cosinus entre les n colonnes de la matrice ; retourne une matrice creuse triangulaire supérieure n x n.computeColumnSummaryStatistics(): MultivariateStatisticalSummary
: retourne, pour chaque colonne, le nombre d’éléments, la valeur maximale et minimale, la moyenne, la variance, les normes L1 et L2.computeCovariance(): Matrix
: retourne la matrice des covariances empiriques entre les variables (colonnes), en considérant que chaque ligne est une observation.computePrincipalComponents(k: Int): Matrix
: retourne dans une matrice locale n x k la projection des observations (lignes de la matrice) sur les k premières composantes principales, triées en ordre décroissant des valeurs propres ; l’analyse est appliquée directement, sans centrage (soustraction de la moyenne) ni réduction (division par l’écart-type) des variables (colonnes de la matrice initiale).
Un exemple d’utilisation :
// Matrice des variances et covariances scala> matLignes.computeCovariance() // Statistiques par colonne scala> val matSummary = matLignes.computeColumnSummaryStatistics() scala> matSummary.count // nombre d'éléments des colonnes scala> matSummary.max // élément maximal pour chaque colonne scala> matSummary.min // élément minimal pour chaque colonne scala> matSummary.mean // moyenne des éléments par colonne scala> matSummary.variance // variance par colonne scala> matSummary.normL1 // norme L1 par colonne scala> matSummary.normL2 // norme L2 par colonne scala> matSummary.numNonzeros // nombre d'éléments non nuls par colonne
Question : calculez la matrice des variances et covariances pour matLignes2
. Quel rapport a-t-elle avec celle obtenue pour matlignes
?
Nous examinerons de plus près computePrincipalComponents()
lors du prochain TP.
Certaines de ces opérations ne sont pas directement disponibles pour IndexedRowMatrix
, en revanche il est possible de passer par une conversion toRowMatrix
afin d’accéder à toutes les opérations de RowMatrix
:
scala> val matLignes2 = matLignesIndexees.toRowMatrix()
Aussi, la seule opération actuellement disponible sur les CoordinateMatrix
est la conversion à une IndexRowMatrix
dont les lignes sont des vecteurs creux (locaux !) : val matLignesIndexees2 = matCoordonnees.toIndexedRowMatrix()
.
Ecrire et exécuter une application dans Spark¶
Lors du premier TP nous avons utilisé Spark en transmettant, à travers spark-shell
, une par une des instructions écrites en Scala. Ce mode de fonctionnement est adapté à l’exécution d’opérations simple et lors de l’étape de mise au point de programmes. Il est toutefois nécessaire de pouvoir écrire des programmes pour Spark (en Scala, dans le cadre de nos TP), de les corriger ou modifier éventuellement et de les lancer en exécution.
Ecriture de l’application¶
Nous ferons d’abord une copie locale d’un fichier HTML qui servira de source de données et préparerons la structure des répertoires nécessaires. Pour cela, dans une nouvelle fenêtre terminal entrez les commandes suivantes :
$ cd tpnum/data $ wget http://cedric.cnam.fr/vertigo/Cours/RCP216/tpSparkScala.html $ cd .. $ mkdir -p src/main/scala
Lancez un éditeur de texte (KWrite
par exemple, dans le menu Applications_pedagogiques
, sous-menu Editeurs
) et copiez dans cet éditeur le programme Scala suivant :
/* SimpleProg.scala */ org.apache.spark.sql.SparkSession object SimpleProg { def main(args: Array[String]) { val myFile = "data/tpSparkScala.html" val spark = SparkSession.builder.appName("Application Simple").getOrCreate() val myData = spark.read.textFile(myFile).cache() val nbclass = myData.filter(line => line.contains("class")).count() val nbjavascript = myData.filter(line => line.contains("javascript")).count() println("Lignes avec javascript : %s, lignes avec class : %s".format(nbjavascript, nbclass)) spark.stop() } }
Enregistrez-le dans un fichier SimpleProg.scala
dans le répertoire ~/tpnum/src/main/scala
. Ici ~/
indique votre répertoire $HOME
(que vous pouvez connaître avec la commande linux echo $HOME
).
Ce programme très simple compte le nombre de lignes du fichier tpSparkScala.html
qui contiennent « javascript » et le nombre de lignes qui contiennent « class ». Lors de l’utilisation de spark-shell
un objet SparkSession
était créé automatiquement. Lorsqu’on écrit une application autonome il est nécessaire d’initialiser un objet SparkSession
explicitement. Pour cela, il est nécessaire d’appeler SparkSession.builder
.
Question : quel est l’intérêt de l’utilisation de .cache()
pour le Dataset myData
?
Le programme dépendant de l’API Spark, nous préparerons aussi un programme de configuration pour SBT, simpleprog.sbt
, qui indique les dépendances nécessaires. Pour cela, lancez l’éditeur de texte et copiez dans l’éditeur les lignes suivantes (attention, les lignes vides de séparation sont nécessaires !) :
name := "Simple Programme" version := "1.0" scalaVersion := "2.11.8" libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.0"
Il faut indiquer les versions de Scala et de Spark qui sont installées sur votre système (ici 2.11.8 et respectivement 2.2.0). Enregistrez ce texte dans un fichier simpleprog.sbt
dans le répertoire ~/tpnum
Si SBT n’est pas présent sur votre système, il est nécessaire de l’installer (voir http://www.scala-sbt.org). SBT est déjà installé dans les salles de travaux pratiques au Cnam.
Il est maintenant possible de créer avec SBT le .jar
contenant le programme. Dans une fenêtre terminal (shell), entrez les commandes suivantes :
$ cd tpnum $ sbt package
Durant les env. 20 minutes nécessaires à SBT pour la création du premier .jar
(rassurez-vous, ce sera beaucoup plus rapide pour les suivants) vous pouvez poursuivre la lecture du support de TP.
Lancement en exécution avec spark-submit¶
Dès que SBT a terminé la création de simple-programme_2.11-1.0.jar
(affichage de [info] Packaging {..}/{..}/target/scala-2.10/simple-programme_2.11-1.0.jar
) il est possible de lancer en exécution l’application avec spark-submit
. Le script spark-submit
permet d’utiliser tous les cluster managers que Spark sait gérer, de façon transparente à l’application. Dans une fenêtre terminal (shell), entrez :
$ spark-submit --class "SimpleProg" --master local target/scala-2.11/simple-programme_2.11-1.0.jar
si tout se passe bien, vous devriez obtenir la réponse
... Lignes avec javascript : 5, lignes avec class : 231
- Les options transmises à
spark-submit
sont : --class
: le point d’entrée pour l’application, iciSimpleProg
;--master
: l’URL dumaster
sur le cluster, icilocal
qui demande l’exécution de l’application en local sur un seul cœur (local[2]
: sur 2 cœurs,local[*]
: sur tous les cœurs disponibles).
D’autres options sont souvent utilisées :
$ spark-submit \ --class <main-class> --master <master-url> \ --deploy-mode <deploy-mode> \ --conf <key>=<value> \ ... # autres options <application-jar> \ [arguments de l'application]
- avec
--deploy-mode
: s’il faut faire tourner le driver sur un des nœuds du cluster (cluster
) ou localement sur un client externe (client
) (par défautclient
) ;--conf
: propriétés de configuration de Spark en format « clé=valeur » ;application-jar
: chemin vers un.jar
« empaqueté » (bundled) contenant l’application et toutes les dépendances et obtenu avec SBT ; l’URL doit être visible à partir de chaque nœud du cluster ;application-arguments
: si nécessaire, arguments à passer à la méthodemain
de l’application.
La totalité des options, dont certaines spécifiques au cluster manager employé, peuvent être obtenues avec spark-submit --help
. Aussi, des informations détaillées sur le processus de lancement et sur l’exécution peuvent être obtenues en ajoutant l’option --verbose
lors du lancement de spark-submit
.
Des informations de configuration sont également présentes dans SPARK_HOME/conf/spark-defaults.conf
(si le fichier spark-defaults.conf
n’existe pas, il faut le créer à partir de spark-defaults.conf.template
), mais sont moins prioritaires que les options indiquées en ligne de commande lors du lancement de spark-submit
.