.. _chap-tpClassificationTweets: ########################################################################################## TP++ - Classification automatique de tweets avec des représentations Word2Vec ########################################################################################## (les TP++ sont des compléments à l'enseignement de base et ne font pas partie des séances de travaux pratiques avec encadrement) Références externes utiles : * `Documentation Spark `_ * `Documentation API Spark en Scala `_ * `Documentation Scala `_ L'objectif de cette séance de travaux pratiques est de réaliser une classification automatique d'un ensemble de tweets en utilisant une représentation vectorielle de type Word2Vec. Apache Spark doit avoir été préalablement installé sur votre ordinateur. Si ce n'est pas déjà fait mais vous disposez d'une version de linux installée, vous pouvez suivre, par exemple, `ces instructions `_. Si un linux n'est pas déjà installé, vous pouvez regarder `l'ensemble de ces instructions `_, mais la totalité de l'installation prendra plus de 30 minutes et exigera le transfert de volumes importants de données. Dans le cadre de cette séance, chacun utilisera Spark sur son propre ordinateur et non sur un *cluster*. Un parallélisme dans l'exécution est envisageable si l'ordinateur dispose de plusieurs cœurs de calcul, les autres ressources (mémoire, stockage) de l'ordinateur seront partagées. Aussi, l'exécution étant locale, Hadoop n'est pas indispensable, c'est le système de fichiers local qui sera employé. Lancement de ``spark-shell`` ======================================= Nous emploierons le langage de programmation ``scala`` (et l'API Spark en ``scala``). Un programme écrit en ``scala`` s'exécute dans une machine virtuelle java et peut appeler directement des librairies ``java``. Nous nous servirons de l'interface en ligne de commande ``spark-shell`` (basée sur celle du langage ``scala``), qui est de type REPL (Read-Eval-Print-Loop). Bien entendu, Spark peut également être utilisé à partir d'un environnement de programmation intégré. Ouvrez une fenêtre de type « terminal », créez un répertoire ``tptweets`` positionnez-vous dans ce répertoire. Vous devez lancer ensuite ``spark-shell`` avec les options suivantes : utilisation de 2 Go de mémoire vive et d'un nombre de *workers* (nœuds de calcul) égal au nombre de cœurs logiques de votre ordinateur. N'utilisez pas toute la mémoire de l'ordinateur pour Spark, votre système d'exploitation en a besoin aussi. Si vous n'avez pas à disposition 2 Go de mémoire pour Spark, vous pouvez en employer moins mais dans ce cas il vous faudra travailler sur une partie des données seulement. Lors du lancement de ``spark-shell`` et de l'exécution, Spark affiche un certain nombre de messages d'information et d'avertissements. Afin de mieux voir les résultats et les éventuelles erreurs, il est utile de limiter les autres messages. Pour cela, créez le fichier ``log4j.properties`` à partir du ``template`` correspondant .. code-block:: bash cp $SPARK_HOME/conf/log4j.properties.template $SPARK_HOME/conf/log4j.properties et, dans ce fichier, sur la ligne ``log4j.rootCategory=INFO, console``, remplacez INFO par ERROR (ou WARN si vous souhaitez voir également les avertissements). Le passage en ``root`` peut être nécessaire pour cela (ou l'utilisation de ``sudo``). Enfin, lancez ``spark-shell`` : .. code-block:: bash spark-shell --driver-memory 2g --master local[*] Importation des librairies nécessaires ======================================= Entrez .. code-block:: scala import org.apache.spark._ import org.apache.spark.rdd._ import org.apache.spark.SparkContext._ import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel} import org.apache.spark.mllib.linalg.{Vector, Vectors, DenseVector, SparseVector} import org.apache.spark.mllib.clustering.KMeans import org.apache.spark.mllib.util.KMeansDataGenerator Ici, ``_`` a une signification de ``*`` (*wildcard*) et ``{...}`` indique une liste. Utilisation d'opérations vectorielles issues de ``breeze`` ========================================================== Pour simplifier l'écriture de certaines opérations vectorielles dans le programme, il est utile de faire appel à une librairie comme ``breeze`` (des alternatives existent) : .. code-block:: scala import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV} import org.apache.spark.mllib.linalg.{Vector => SparkVector} def toBreeze(v:SparkVector) = BV(v.toArray) def fromBreeze(bv:BV[Double]) = Vectors.dense(bv.toArray) def add(v1:SparkVector, v2:SparkVector) = fromBreeze(toBreeze(v1) + toBreeze(v2)) def scalarMultiply(a:Double, v:SparkVector) = fromBreeze(a * toBreeze(v)) Préparation des données ======================= Un (petit) ensemble de tweets a été obtenu au préalable, les tweets (étiquetés par Twitter comme étant) en anglais ont été sélectionnés et leur contenu textuel a été extrait et enregistré (une ligne de texte par tweet) dans un fichier que vous devez télécharger dans le répertoire ``tptweets`` : .. code-block:: bash wget http://cedric.cnam.fr/~crucianm/src/tweets.zip unzip tweets.zip Plus tard lors de cette séance de TP vous verrez comment ces tweets ont été obtenus. Le fichier de données (tweets) est stocké ici dans le système de fichiers local. Lors du déploiement sur un *cluster* il est nécessaire de stocker les données dans le système de fichiers distribué (HDFS) et de les lire depuis ce système. Avant de faire la classification des tweets, nous supprimerons les *stop words* de leur contenu textuel. Pour cela, il vous faut également télécharger un fichier qui contient ces *stop words* : .. code-block:: bash wget http://cedric.cnam.fr/~crucianm/src/stop_words Modèle Word2Vec =============== Pour la classification automatique, le texte de chaque tweet est représenté par un vecteur de dimension fixe (ici égale à 100). Le vecteur associé au texte d'un tweet est le centre de gravité des vecteurs Word2Vec qui représentent les mots de ce texte (après suppression des *stop words*). Les représentations vectorielles Word2Vec sont construites automatiquement à partir de ressources textuelles volumineuses constituées de textes bien formés dans une langue cible. L'objectif de ces représentations est de prédire le mieux possible le contexte des mots. En conséquence, la similarité entre les représentations Word2Vec de deux mots intègre à la fois une similarité sémantique et une similarité de rôle dans la phrase. Des explications plus détaillées peuvent être trouvées dans `ce cours `_ et dans les références qui y sont indiquées. Nous reviendrons plus tard lors de cette séance sur la construction des représentations Word2Vec, processus relativement long sur un seul ordinateur et exigeant en mémoire vive. Pour la classification, nous nous servirons de représentations calculées au préalable, que vous devez télécharger (également dans le répertoire ``tptweets``) : .. code-block:: bash wget http://cedric.cnam.fr/~crucianm/src/w2vModel.zip unzip w2vModel.zip Chargement et distribution des *stop words* et du modèle Word2Vec ================================================================= Le filtrage des *stop words*, ainsi que le calcul des représentations Word2Vec pour les tweets, doivent être réalisés sur les *workers* (nœuds de calcul). Il est donc nécessaire de distribuer au préalable aux *workers* l'ensemble des *stop words*, ainsi que le modèle Word2Vec (l'association entre les mots du vocabulaire et leurs représentations pré-calculées). La solution est l'utilisation de « variables » de type *broadcast* : .. code-block:: scala // lire les stop words import scala.io.Source val stopWords = Source.fromFile("stop_words").getLines.toSet // transmettre les stop words aux noeuds de calcul val bStopWords = sc.broadcast(stopWords) // lire le Word2VecModel val w2vModel = Word2VecModel.load(sc, "w2vModel") // obtenir une Map[String, Array[Float]] sérializable // mapValues seul ne retourne pas une map sérializable (SI-7005) val vectors = w2vModel.getVectors.mapValues(vv => Vectors.dense(vv.map(_.toDouble))).map(identity) // transmettre la map aux noeuds de calcul val bVectors = sc.broadcast(vectors) Calcul des représentations Word2Vec pour les textes des tweets ============================================================== Il est maintenant possible de lire les données (du système de fichiers local, dans le cadre de ce TP) dans un RDD et de calculer les représentations Word2Vec pour les textes des tweets. Il est possible de copier et coller dans ``spark-shell`` des instructions multi-lignes en entrant d'abord ``:paste``. .. code-block:: scala // taille par défaut des vecteurs Word2Vec val vectSize = 100 // lecture du fichier de tweets dans un RDD (item = ligne) val sentences = sc.textFile("tweets") // calcul des représentations Word2Vec des tweets val sent2vec = sentences.filter(sentence => sentence.length >= 1) .map(sentence => sentence.toLowerCase.split("\\W+")) .map(wordSeq => { var vSum = Vectors.zeros(vectSize) var vNb = 0 wordSeq.foreach { word => if(!(bStopWords.value)(word) & (word.length >= 2)) { bVectors.value.get(word).foreach { v => vSum = add(v, vSum) vNb += 1 } } } if (vNb != 0) { vSum = scalarMultiply(1.0 / vNb, vSum) } vSum }).filter(vec => Vectors.norm(vec, 1.0) > 0.0).persist() Quelles sont les opérations réalisées ? D'abord, les lignes de longueur inférieure à 2 sont supprimées. Les caractères sont transformés en minuscules car le vocabulaire Word2Vec est en minuscules, ensuite les phrases sont découpées en mots en assimilant les caractères non alphabétiques à des espaces (d'autres choix sont possibles ici). Le vecteur pour chaque phrase est obtenu en faisant l'addition des mots qui composent la phrase et en divisant par leur nombre Il faut noter que ``bVectors.value.get(word)`` retourne un ``Option[Vector]``, c'est à dire un ``Vector`` si le mot est dans le vocabulaire et rien si le mot n'est pas dans le vocabulaire (cela permet de comprendre ``bVectors.value.get(word).foreach``). Une phrase non vide mais pour laquelle aucun mot n'est dans le vocabulaire sera représentée par le vecteur nul ; pour éviter d'obtenir un cluster formé de vecteurs nuls, ces vecteurs sont éliminés (filtrage par ``Vectors.norm(vec, 1.0) > 0.0``). Enfin, le RDD est rendu persistant car la classification qui suit est un processus itératif. Examinez le nombre de vecteurs obtenus : .. code-block:: scala sent2vec.count() Quel est l'effet de ``.count()`` ici ? Classification automatique : *K-means* avec initialisation par K-means|| ======================================================================== Spark MLlib propose une implémentation de l'algorithme de classification automatique *k-means*. L'initialisation des centres est réalisée par l'algorithme parallèle `k-means|| `_, avec 5 itérations (``initializationSteps: 5``, peut être modifié avec ``KMeans.setInitializationSteps``). La classification est obtenue en appelant ``KMeans.train``, avec des paramètres dont la signification est : * ``data`` : données à classifier, sous forme de ``RDD[Vector]`` * ``k`` : nombre de groupes (*clusters*) * ``maxIterations`` : nombre maximal d'itérations ; arrêt de l'exécution lorsque les centres sont stabilisés (à ``epsilon: 1e-4`` près) ou ``maxIterations`` est atteint ; ``epsilon`` peut être modifié en appelant ``KMeans.epsilon`` * ``runs`` : nombre d'exécutions (1 par défaut) ; c'est le meilleur modèle qui est retourné * ``initializationMode`` : mode d'initialisation des centres, soit ``random`` soit ``k-means||`` (par défaut) * ``seed`` : initialisation du générateur de nombres aléatoires Pour réaliser la classification automatique des vecteurs de ``sent2vec``, entrez dans ``spark-shell`` les commandes suivantes : .. code-block:: scala val nbClusters = 5 val nbIterations = 200 val clustering = KMeans.train(sent2vec, nbClusters, nbIterations) La classification peut prendre un certain temps sur votre ordinateur, profitez-en pour lire la suite. Affichage des résultats ======================= Quelle est la signification des regroupements (*clusters*) obtenus ? Pour l'examiner, une solution simple est de considérer les vecteurs des centres de groupes et de retourner, pour chacun de ces vecteurs, les mots du vocabulaires dont les représentations Word2Vec sont les plus proches : .. code-block:: scala clustering.clusterCenters.foreach(clusterCenter => { w2vModel.findSynonyms(clusterCenter,5).foreach(synonym => print(" %s (%5.3f)," .format(synonym._1, synonym._2))) println() }) Que constatez-vous ? Tous ces mots font-ils partie des tweets analysés ? Réfléchissez à d'autres méthodes permettant de mieux comprendre le contenu des *clusters* résultants. Comment obtenir le corpus de tweets =================================== Pour obtenir un corpus de tweets, il est nécessaire de réceptionner un flux de Twitter et le filtrer (par hashtags, par langue, etc.). Afin de pouvoir recevoir des données à partir de votre application, il est nécessaire d'être certifié (obtenir les *credentials*), suivez `ce guide `_ (« 1.2. Twitter Credential Setup »). Les outils nécessaires pour accéder au flux Twitter doivent être inclus dans une archive ``.jar``, créée avec l'utilitaire SBT à partir des sources qui se trouvent dans le fichier ``$SPARK_HOME/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala`` (à copier dans votre répertoire ``tptweets``) et du fichier ``build.sbt`` suivant : .. code-block:: scala name := "TwitterPopularTags" version := "1.0" mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) => { case PathList("META-INF", xs @ _*) => MergeStrategy.discard case x => MergeStrategy.first } } libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1" % "provided" libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.1" % "provided" libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.1" exclude("org.twitter4j", "twitter4j") libraryDependencies += "org.twitter4j" % "twitter4j-stream" % "3.0.6" resolvers += "Akka Repository" at "http://repo.akka.io/releases/" (conserver les lignes vides !) Nous avons remplacé ici la version 3.0.3 de twitter4j (utilisée par défaut dans Spark 1.6.1) par la version 3.0.6 car à partir de cette dernière nous avons accès au filtrage par la langue (``status.getLang``). Pour l'installation et l'utilisation de SBT vous avez quelques indications `ici `_. **Dans le cas présent, il faudra lancer SBT avec** ``sbt assembly`` (et **non** ``sbt package``). Lancez ensuite ``spark-shell`` avec ``spark-shell --master local[3] --jars target/scala-2.10/TwitterPopularTags.jar`` et entrez : .. code-block:: scala import org.apache.spark.SparkConf import org.apache.spark.SparkContext._ import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.twitter._ import org.apache.spark.streaming.StreamingContext._ // un Array de strings peut servir à filtrer les tweets //val filters = Array("") // préciser les OAuth credentials obtenus au préalable val consumerKey = "..." val consumerSecret = "..." val accessToken = "..." val accessTokenSecret = "..." System.setProperty("twitter4j.oauth.consumerKey", consumerKey) System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret) System.setProperty("twitter4j.oauth.accessToken", accessToken) System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret) // tranches de 60 secondes val ssc = new StreamingContext(sc, Seconds(60)) val tweetStream = TwitterUtils.createStream(ssc, None) //val tweetStream = TwitterUtils.createStream(ssc, None, filters) // test : afficher le nombre de tweets en anglais reçus par tranche //tweetStream.foreachRDD((rdd,time) => { // println("%s at time %s".format(rdd.filter(status => // status.getLang.startsWith("en")) // .map(status => status.getText) // .count(),time.milliseconds))}) // filtrer par lang=en, enregistrer chaque RDD (tranche du flux) dans un // nouveau répertoire ; nécessaire d'inclure la version >= 3.0.6 de twitter4j // dans le jar à la place de la version par défaut 3.0.3 (sans getLang) tweetStream.foreachRDD((rdd,time) => rdd.filter(status => status.getLang.startsWith("en")) .map(status => status.getText) .saveAsTextFile("tweets"+(time.milliseconds.toString))) // démarrer le traitement du flux ssc.start() // arrêter (gentiment) le traitement du flux mais pas le spark context ssc.stop(false,true) Comment obtenir le modèle Word2Vec ================================== Pour obtenir un modèle Word2Vec, il faut suivre `le guide `_. Lors d'une exécution en local, il est nécessaire d'augmenter la borne supérieure pour la mémoire allouée à la machine virtuelle java (``spark-shell --driver-memory 4g ...``) ou, si la quantité de mémoire disponible n'est pas suffisante, réduire la taille du fichier de textes. Attention toutefois à ne pas le réduire trop (à moins d'un quart du fichier de départ) car la qualité des représentations vectorielles résultantes serait trop faible. .. code-block:: scala import org.apache.spark._ import org.apache.spark.rdd._ import org.apache.spark.SparkContext._ import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel} val input = sc.textFile("text8").map(line => line.split(" ").toSeq) val word2vec = new Word2Vec() val w2vModel = word2vec.fit(input) // enregistrer le modèle w2vModel.save(sc, "w2vModel") Questions ========= .. admonition:: Question : Variez (surtout à la hausse...) le nombre de centres pour la classification automatique, examinez la « lisibilité » des *clusters*.