TP++ - Classification automatique de tweets avec des représentations Word2Vec

(les TP++ sont des compléments à l’enseignement de base et ne font pas partie des séances de travaux pratiques avec encadrement)

Références externes utiles :

L’objectif de cette séance de travaux pratiques est de réaliser une classification automatique d’un ensemble de tweets en utilisant une représentation vectorielle de type Word2Vec.

Apache Spark doit avoir été préalablement installé sur votre ordinateur. Si ce n’est pas déjà fait mais vous disposez d’une version de linux installée, vous pouvez suivre, par exemple, ces instructions. Si un linux n’est pas déjà installé, vous pouvez regarder l’ensemble de ces instructions, mais la totalité de l’installation prendra plus de 30 minutes et exigera le transfert de volumes importants de données.

Dans le cadre de cette séance, chacun utilisera Spark sur son propre ordinateur et non sur un cluster. Un parallélisme dans l’exécution est envisageable si l’ordinateur dispose de plusieurs cœurs de calcul, les autres ressources (mémoire, stockage) de l’ordinateur seront partagées. Aussi, l’exécution étant locale, Hadoop n’est pas indispensable, c’est le système de fichiers local qui sera employé.

Lancement de spark-shell

Nous emploierons le langage de programmation scala (et l’API Spark en scala). Un programme écrit en scala s’exécute dans une machine virtuelle java et peut appeler directement des librairies java. Nous nous servirons de l’interface en ligne de commande spark-shell (basée sur celle du langage scala), qui est de type REPL (Read-Eval-Print-Loop). Bien entendu, Spark peut également être utilisé à partir d’un environnement de programmation intégré.

Ouvrez une fenêtre de type « terminal », créez un répertoire tptweets positionnez-vous dans ce répertoire. Vous devez lancer ensuite spark-shell avec les options suivantes : utilisation de 2 Go de mémoire vive et d’un nombre de workers (nœuds de calcul) égal au nombre de cœurs logiques de votre ordinateur. N’utilisez pas toute la mémoire de l’ordinateur pour Spark, votre système d’exploitation en a besoin aussi. Si vous n’avez pas à disposition 2 Go de mémoire pour Spark, vous pouvez en employer moins mais dans ce cas il vous faudra travailler sur une partie des données seulement.

Lors du lancement de spark-shell et de l’exécution, Spark affiche un certain nombre de messages d’information et d’avertissements. Afin de mieux voir les résultats et les éventuelles erreurs, il est utile de limiter les autres messages. Pour cela, créez le fichier log4j.properties à partir du template correspondant

cp $SPARK_HOME/conf/log4j.properties.template $SPARK_HOME/conf/log4j.properties

et, dans ce fichier, sur la ligne log4j.rootCategory=INFO, console, remplacez INFO par ERROR (ou WARN si vous souhaitez voir également les avertissements). Le passage en root peut être nécessaire pour cela (ou l’utilisation de sudo).

Enfin, lancez spark-shell :

spark-shell --driver-memory 2g --master local[*]

Importation des librairies nécessaires

Entrez

import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
import org.apache.spark.mllib.linalg.{Vector, Vectors, DenseVector, SparseVector}
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.util.KMeansDataGenerator

Ici, _ a une signification de * (wildcard) et {...} indique une liste.

Utilisation d’opérations vectorielles issues de breeze

Pour simplifier l’écriture de certaines opérations vectorielles dans le programme, il est utile de faire appel à une librairie comme breeze (des alternatives existent) :

import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV}
import org.apache.spark.mllib.linalg.{Vector => SparkVector}
def toBreeze(v:SparkVector) = BV(v.toArray)
def fromBreeze(bv:BV[Double]) = Vectors.dense(bv.toArray)
def add(v1:SparkVector, v2:SparkVector) = fromBreeze(toBreeze(v1) + toBreeze(v2))
def scalarMultiply(a:Double, v:SparkVector) = fromBreeze(a * toBreeze(v))

Préparation des données

Un (petit) ensemble de tweets a été obtenu au préalable, les tweets (étiquetés par Twitter comme étant) en anglais ont été sélectionnés et leur contenu textuel a été extrait et enregistré (une ligne de texte par tweet) dans un fichier que vous devez télécharger dans le répertoire tptweets :

wget http://cedric.cnam.fr/~crucianm/src/tweets.zip
unzip tweets.zip

Plus tard lors de cette séance de TP vous verrez comment ces tweets ont été obtenus. Le fichier de données (tweets) est stocké ici dans le système de fichiers local. Lors du déploiement sur un cluster il est nécessaire de stocker les données dans le système de fichiers distribué (HDFS) et de les lire depuis ce système.

Avant de faire la classification des tweets, nous supprimerons les stop words de leur contenu textuel. Pour cela, il vous faut également télécharger un fichier qui contient ces stop words :

wget http://cedric.cnam.fr/~crucianm/src/stop_words

Modèle Word2Vec

Pour la classification automatique, le texte de chaque tweet est représenté par un vecteur de dimension fixe (ici égale à 100). Le vecteur associé au texte d’un tweet est le centre de gravité des vecteurs Word2Vec qui représentent les mots de ce texte (après suppression des stop words). Les représentations vectorielles Word2Vec sont construites automatiquement à partir de ressources textuelles volumineuses constituées de textes bien formés dans une langue cible. L’objectif de ces représentations est de prédire le mieux possible le contexte des mots. En conséquence, la similarité entre les représentations Word2Vec de deux mots intègre à la fois une similarité sémantique et une similarité de rôle dans la phrase. Des explications plus détaillées peuvent être trouvées dans ce cours et dans les références qui y sont indiquées.

Nous reviendrons plus tard lors de cette séance sur la construction des représentations Word2Vec, processus relativement long sur un seul ordinateur et exigeant en mémoire vive. Pour la classification, nous nous servirons de représentations calculées au préalable, que vous devez télécharger (également dans le répertoire tptweets) :

wget http://cedric.cnam.fr/~crucianm/src/w2vModel.zip
unzip w2vModel.zip

Chargement et distribution des stop words et du modèle Word2Vec

Le filtrage des stop words, ainsi que le calcul des représentations Word2Vec pour les tweets, doivent être réalisés sur les workers (nœuds de calcul). Il est donc nécessaire de distribuer au préalable aux workers l’ensemble des stop words, ainsi que le modèle Word2Vec (l’association entre les mots du vocabulaire et leurs représentations pré-calculées). La solution est l’utilisation de « variables » de type broadcast :

// lire les stop words
import scala.io.Source
val stopWords = Source.fromFile("stop_words").getLines.toSet

// transmettre les stop words aux noeuds de calcul
val bStopWords = sc.broadcast(stopWords)

// lire le Word2VecModel
val w2vModel = Word2VecModel.load(sc, "w2vModel")

// obtenir une Map[String, Array[Float]] sérializable
//   mapValues seul ne retourne pas une map sérializable (SI-7005)
val vectors = w2vModel.getVectors.mapValues(vv => Vectors.dense(vv.map(_.toDouble))).map(identity)

// transmettre la map aux noeuds de calcul
val bVectors = sc.broadcast(vectors)

Calcul des représentations Word2Vec pour les textes des tweets

Il est maintenant possible de lire les données (du système de fichiers local, dans le cadre de ce TP) dans un RDD et de calculer les représentations Word2Vec pour les textes des tweets. Il est possible de copier et coller dans spark-shell des instructions multi-lignes en entrant d’abord :paste.

// taille par défaut des vecteurs Word2Vec
val vectSize = 100

// lecture du fichier de tweets dans un RDD (item = ligne)
val sentences = sc.textFile("tweets")

// calcul des représentations Word2Vec des tweets
val sent2vec = sentences.filter(sentence => sentence.length >= 1)
    .map(sentence => sentence.toLowerCase.split("\\W+"))
    .map(wordSeq => {
        var vSum = Vectors.zeros(vectSize)
        var vNb = 0
        wordSeq.foreach { word =>
            if(!(bStopWords.value)(word) & (word.length >= 2)) {
                bVectors.value.get(word).foreach { v =>
                    vSum = add(v, vSum)
                    vNb += 1
                }
            }
        }
        if (vNb != 0) {
            vSum = scalarMultiply(1.0 / vNb, vSum)
        }
        vSum
    }).filter(vec => Vectors.norm(vec, 1.0) > 0.0).persist()

Quelles sont les opérations réalisées ?

D’abord, les lignes de longueur inférieure à 2 sont supprimées. Les caractères sont transformés en minuscules car le vocabulaire Word2Vec est en minuscules, ensuite les phrases sont découpées en mots en assimilant les caractères non alphabétiques à des espaces (d’autres choix sont possibles ici). Le vecteur pour chaque phrase est obtenu en faisant l’addition des mots qui composent la phrase et en divisant par leur nombre

Il faut noter que bVectors.value.get(word) retourne un Option[Vector], c’est à dire un Vector si le mot est dans le vocabulaire et rien si le mot n’est pas dans le vocabulaire (cela permet de comprendre bVectors.value.get(word).foreach).

Une phrase non vide mais pour laquelle aucun mot n’est dans le vocabulaire sera représentée par le vecteur nul ; pour éviter d’obtenir un cluster formé de vecteurs nuls, ces vecteurs sont éliminés (filtrage par Vectors.norm(vec, 1.0) > 0.0). Enfin, le RDD est rendu persistant car la classification qui suit est un processus itératif.

Examinez le nombre de vecteurs obtenus :

sent2vec.count()

Quel est l’effet de .count() ici ?

Classification automatique : K-means avec initialisation par K-means||

Spark MLlib propose une implémentation de l’algorithme de classification automatique k-means. L’initialisation des centres est réalisée par l’algorithme parallèle k-means||, avec 5 itérations (initializationSteps: 5, peut être modifié avec KMeans.setInitializationSteps).

La classification est obtenue en appelant KMeans.train, avec des paramètres dont la signification est :
  • data : données à classifier, sous forme de RDD[Vector]

  • k : nombre de groupes (clusters)

  • maxIterations : nombre maximal d’itérations ; arrêt de l’exécution lorsque les centres sont stabilisés (à epsilon: 1e-4 près) ou maxIterations est atteint ; epsilon peut être modifié en appelant KMeans.epsilon

  • runs : nombre d’exécutions (1 par défaut) ; c’est le meilleur modèle qui est retourné

  • initializationMode : mode d’initialisation des centres, soit random soit k-means|| (par défaut)

  • seed : initialisation du générateur de nombres aléatoires

Pour réaliser la classification automatique des vecteurs de sent2vec, entrez dans spark-shell les commandes suivantes :

val nbClusters = 5
val nbIterations = 200
val clustering = KMeans.train(sent2vec, nbClusters, nbIterations)

La classification peut prendre un certain temps sur votre ordinateur, profitez-en pour lire la suite.

Affichage des résultats

Quelle est la signification des regroupements (clusters) obtenus ? Pour l’examiner, une solution simple est de considérer les vecteurs des centres de groupes et de retourner, pour chacun de ces vecteurs, les mots du vocabulaires dont les représentations Word2Vec sont les plus proches :

clustering.clusterCenters.foreach(clusterCenter => {
    w2vModel.findSynonyms(clusterCenter,5).foreach(synonym => print(" %s (%5.3f),"
            .format(synonym._1, synonym._2)))
    println()
})

Que constatez-vous ? Tous ces mots font-ils partie des tweets analysés ? Réfléchissez à d’autres méthodes permettant de mieux comprendre le contenu des clusters résultants.

Comment obtenir le corpus de tweets

Pour obtenir un corpus de tweets, il est nécessaire de réceptionner un flux de Twitter et le filtrer (par hashtags, par langue, etc.).

Afin de pouvoir recevoir des données à partir de votre application, il est nécessaire d’être certifié (obtenir les credentials), suivez ce guide (« 1.2. Twitter Credential Setup »).

Les outils nécessaires pour accéder au flux Twitter doivent être inclus dans une archive .jar, créée avec l’utilitaire SBT à partir des sources qui se trouvent dans le fichier $SPARK_HOME/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala (à copier dans votre répertoire tptweets) et du fichier build.sbt suivant :

name := "TwitterPopularTags"

version := "1.0"

mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
   {
    case PathList("META-INF", xs @ _*) => MergeStrategy.discard
    case x => MergeStrategy.first
   }
}

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1" % "provided"

libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.1" % "provided"

libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.6.1" exclude("org.twitter4j", "twitter4j")

libraryDependencies += "org.twitter4j" % "twitter4j-stream" % "3.0.6"

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

(conserver les lignes vides !)

Nous avons remplacé ici la version 3.0.3 de twitter4j (utilisée par défaut dans Spark 1.6.1) par la version 3.0.6 car à partir de cette dernière nous avons accès au filtrage par la langue (status.getLang).

Pour l’installation et l’utilisation de SBT vous avez quelques indications ici. Dans le cas présent, il faudra lancer SBT avec sbt assembly (et non sbt package).

Lancez ensuite spark-shell avec spark-shell --master local[3] --jars target/scala-2.10/TwitterPopularTags.jar et entrez :

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._

// un Array de strings peut servir à filtrer les tweets
//val filters = Array("")

// préciser les OAuth credentials obtenus au préalable
val consumerKey = "..."
val consumerSecret = "..."
val accessToken = "..."
val accessTokenSecret = "..."
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)

// tranches de 60 secondes
val ssc = new StreamingContext(sc, Seconds(60))

val tweetStream = TwitterUtils.createStream(ssc, None)
//val tweetStream = TwitterUtils.createStream(ssc, None, filters)

// test : afficher le nombre de tweets en anglais reçus par tranche
//tweetStream.foreachRDD((rdd,time) => {
//    println("%s at time %s".format(rdd.filter(status =>
//        status.getLang.startsWith("en"))
//              .map(status => status.getText)
//              .count(),time.milliseconds))})

// filtrer par lang=en, enregistrer chaque RDD (tranche du flux) dans un
//  nouveau répertoire ; nécessaire d'inclure la version >= 3.0.6 de twitter4j
//   dans le jar à la place de la version par défaut 3.0.3 (sans getLang)
tweetStream.foreachRDD((rdd,time) =>
    rdd.filter(status =>
        status.getLang.startsWith("en"))
              .map(status => status.getText)
              .saveAsTextFile("tweets"+(time.milliseconds.toString)))

// démarrer le traitement du flux
ssc.start()

// arrêter (gentiment) le traitement du flux mais pas le spark context
ssc.stop(false,true)

Comment obtenir le modèle Word2Vec

Pour obtenir un modèle Word2Vec, il faut suivre le guide. Lors d’une exécution en local, il est nécessaire d’augmenter la borne supérieure pour la mémoire allouée à la machine virtuelle java (spark-shell --driver-memory 4g ...) ou, si la quantité de mémoire disponible n’est pas suffisante, réduire la taille du fichier de textes. Attention toutefois à ne pas le réduire trop (à moins d’un quart du fichier de départ) car la qualité des représentations vectorielles résultantes serait trop faible.

import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}

val input = sc.textFile("text8").map(line => line.split(" ").toSeq)

val word2vec = new Word2Vec()
val w2vModel = word2vec.fit(input)

// enregistrer le modèle
w2vModel.save(sc, "w2vModel")

Questions

Question :

Variez (surtout à la hausse…) le nombre de centres pour la classification automatique, examinez la « lisibilité » des clusters.