.. _chap-tpGraphes:
###############################################################
Travaux pratiques - Fouille de réseaux sociaux, première partie
###############################################################
Références externes utiles :
* `Documentation Spark `_
* `Documentation API Spark en Scala `_
* `Documentation langage Scala `_
************
Introduction
************
Depuis quelques dizaines d'années, des chercheurs de disciplines aussi variées
que les neurosciences, la sociologie, ou les sciences politiques se sont aperçus
qu'ils partageaient le besoin de comprendre finement des `relations` entre
différentes entités `organisées en réseaux` (portions cérébrales, individus,
institutions, etc.). Grâce à des collaborations avec des chercheurs en
mathématiques et informatique, il est devenu peu à peu évident que les
techniques utilisées dans chaque domaine pouvaient s'appliquer aux autres : le
champ de la `network science` était né.
Cette dernière repose sur les outils de la `théorie des graphes`, branche des
mathématiques née au XVIIIe siècle avec Euler et le `problème des sept ponts de
Königsberg
`_.
Celle-ci se concentre sur l'étude des propriétés d'objets particuliers, les
`graphes`, composés d'entités (représentés par des `noeuds` ou des `sommets`)
qui sont mises en relation (par des `liens` ou des `arêtes`). La théorie des
graphes a également, comme on le voit dans le cours, des applications multiples
en informatique, du routage dans l'Internet aux systèmes de recommandations, en
passant par le PageRank (à l'origine de Google) et les réseaux sociaux.
Dans ces deux séances de travaux pratiques, nous allons étudier une librairie de
Spark appelée GraphX, qui étend les fonctionnalités de Spark avec des
algorithmes parallélisés pour traiter les problèmes de graphes. Bien qu'elle ne
soit pas forcément la plus rapide comparée à certains `frameworks` dédiés aux
graphes, elle présente l'avantage de fonctionner au sein de Spark et en rend
l'utilisation d'autant plus facile. On peut ainsi combiner, comme on va le
faire, des traitements classiques et des traitements spécifiques aux graphes
sous-jacents dans nos données.
Objectif du TP
--------------
Nous allons travailler sur des articles scientifiques, décrits par des
mots-clefs (ou `tags`) et étudier le graphe de ces tags (qui seront reliés si un
ou plusieurs articles les possèdent en commun). Notre but sera de comprendre la
structure et les propriétés du graphe. Dans cette première partie, nous allons
commencer par une analyse simple des *sujets principaux* des articles et de
leurs co-occurrences, ce qui ne nécessitera pas le recours à GraphX. On
recherchera ensuite les `composantes connexes`, pour voir si les citations
forment des petits groupes ou si, au contraire, elles sont toutes reliées. Nous
aborderons ensuite la question de la distribution des degrés dans le graphe.
Dans la séance de travaux pratiques de la semaine prochaine, nous poursuivrons
notre exploration de ce graphe.
Description des données
-----------------------
Nous allons récupérer des données textuelles issues d'une base de données de
publications scientifiques spécialisées en médecine et sciences du vivant,
MEDLINE. Celle-ci est gérée par l'équivalent américain de l'INSERM, la NIH. La
base disponible en ligne depuis 1996 et contient plus de 20 millions d'articles.
En raison du volume important et de la fréquence élevée des mises à jour, la
communauté scientifique a développé un ensemble de tags sémantiques appelé MeSH
(Medical Subject Headings), appliqué à toutes les citations de l'index. Ils
permettent donc d'étudier les relations entre les documents, afin de faciliter
le processus de revue par les pairs et la recherche d'information.
Nous utilisons dans la suite un sous-ensemble des données de citations de
MEDLINE et allons étudier le réseau des tags MeSH.
Récupération des données
------------------------
On va récupérer quelques fichiers contenant des échantillons de l'index des
citations sur le serveur FTP du NIH.
.. code-block:: bash
[cloudera@quickstart ~]$ mkdir -p tpgraphes/medline_data
[cloudera@quickstart ~]$ cd tpgraphes/medline_data
[cloudera@quickstart medline_data]$ wget ftp://ftp.nlm.nih.gov/nlmdata/sample/medline/*.gz
Décompressons les fichiers et regardons leur taille.
.. code-block:: bash
[cloudera@quickstart medline_data]$ gunzip *.gz
[cloudera@quickstart medline_data]$ ls -ltr
Après décompression, il y a en tout près de 600 Mo de données au format XML.
Regardez les données, avec un éditeur de texte (comme `gedit` par exemple).
Chaque entrée est un enregistrement appelé MedlineCitation, qui contient les
informations relatives à la publication de l'article, telles que : le nom du
journal, le numéro, la date, les noms des auteurs, et un ensemble de mots-clefs
MeSH. Chacun de ces mots a un attribut pour dire s'il s'agit d'un *sujet
principal* (MajorTopic, en anglais) de l'article ou non. Exemple :
.. code-block:: xml
12255379
1980
01
03
...
...
Intelligence
Maternal-Fetal Exchange
...
...
Lors du TP sur la fouille de texte, nous étions intéressés à l'étude du texte
non structuré. Ici, nous allons examiner les valeurs contenues dans le champ
DescriptorName en `parsant` le XML directement. Scala fournit un très bon
support d'XML, nous allons en profiter.
Retrouvez le fichier *lsa.jar* du TP sur la fouille de texte (récupérez-le
:ref:`avec la dernière commande de ce bloc `), copiez-le dans le dossier
``tpgraphes`` et ouvrez spark-shell ainsi :
.. code-block:: bash
[cloudera@quickstart medline_data]$ cd ..
[cloudera@quickstart tpgraphes]$ wget http://cedric.cnam.fr/~ferecatu/RCP216/tp/tptexte/lsa.jar
[cloudera@quickstart tpgraphes]$ spark-shell --driver-memory 1g --jars lsa.jar
Avec Spark, on commence par importer les paquetages nécessaires :
.. code-block:: scala
scala> import com.cloudera.datascience.common.XmlInputFormat
scala> import org.apache.spark.SparkContext
scala> import scala.xml._
scala> import org.apache.hadoop.io.{Text, LongWritable}
scala> import org.apache.hadoop.conf.Configuration
scala> import org.apache.spark.rdd.RDD
Puis, on crée une fonction chargée d'extraire les entrées dans les fichiers
XML contenus dans le dossier ``medline_data``.
.. code-block:: scala
def loadMedline(sc: SparkContext, path: String) = {
@transient val conf = new Configuration()
conf.set(XmlInputFormat.START_TAG_KEY, "")
val in = sc.newAPIHadoopFile(path, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf)
in.map(line => line._2.toString)
}
scala> val medline_raw = loadMedline(sc, "file:///home/cloudera/tpgraphes/medline_data")
Regardez ensuite ce que vous avez récupéré dans medline_raw.
.. code-block:: scala
scala> val raw_xml = medline_raw.take(1)(0)
scala> val elem = XML.loadString(raw_xml)
Avec Scala, la manipulation de l'arbre XML est assez aisée. L'opérateur "\\"
permet d'obtenir les enfants directs d'un nœud par leur nom. Les enfants
indirects ce sera avec "\\\\". Essayez avec :
.. code-block:: scala
scala> elem \ "MeshHeadingList"
scala> (elem \\ "DescriptorName").map(_.text)
On va donc, une fois ce mécanisme compris, pouvoir récupérer les *sujets
principaux* des articles avec la fonction suivante :
.. code-block:: scala
def majorTopics(elem: Elem): Seq[String] = {
val dn = elem \\ "DescriptorName"
val mt = dn.filter(n => (n \ "@MajorTopicYN").text == "Y")
mt.map(n => n.text)
}
scala> majorTopics(elem)
Appliquons-la à toutes nos données et extrayons les sujets :
.. code-block:: scala
scala> val mxml: RDD[Elem] = medline_raw.map(XML.loadString)
scala> val medline: RDD[Seq[String]] = mxml.map(majorTopics).cache()
scala> medline.count()
scala> val topics: RDD[String] = medline.flatMap(mesh => mesh)
scala> topics.take(10)
Co-occurrences des sujets principaux de MeSH
--------------------------------------------
Nous avons maintenant extrait les tags MeSH des données MEDLINE, nous allons
commencer par calculer quelques statistiques simples, comme le nombre
d'enregistrements et un histogramme des fréquences des *sujets principaux*.
.. code-block:: scala
scala> medline.count()
scala> val topicCounts = topics.countByValue()
scala> topicCounts.size
scala> val tcSeq = topicCounts.toSeq
scala> tcSeq.sortBy(_._2).reverse.take(10).foreach(println)
.. il y a une longue queue à la distribution du nombre de topic par documents
.. scala> val valueDist = topicCounts.groupBy(_._2).mapValues(_.size)
.. scala> valueDist.toSeq.sorted.take(10).foreach(println)
Nous cherchons ici les co-occurrences des sujets principaux. Chaque entrée de
`medline` est une liste de chaînes de caractères qui sont les sujets attribués à
chaque article. Il nous faut donc générer tous les sous-ensembles de deux
éléments de ces listes. Pour cela, Scala nous fournit heureusement une méthode
appelée ``combinations``, qui permet d'obtenir le résultat escompté. Voici, sur
un exemple simple, comment utiliser cette méthode :
.. code-block:: scala
scala> val list = List(1, 2, 3)
scala> val combs = list.combinations(2)
scala> combs.foreach(println)
On doit cependant se méfier : les listes retournées par `combinations` tiennent
compte de l'ordre des éléments, et deux listes comportant les mêmes éléments
dans le désordre ne sont pas égales :
.. code-block:: scala
scala> val combs = list.reverse.combinations(2)
scala> combs.foreach(println)
scala> List(3, 2) == List(2, 3)
Il faudra donc que l'on s'assure que les listes de sujets sont triées avant
d'appeler `combinations`, d'où le `sorted` ci-dessous.
.. code-block:: scala
scala> val topicPairs = medline.flatMap(t => t.sorted.combinations(2))
scala> val cooccurs = topicPairs.map(p => (p, 1)).reduceByKey(_+_)
scala> cooccurs.cache()
scala> cooccurs.count()
.. admonition:: Question :
Quel est le nombre total de co-occurrences possible ? Combien en obtenez-vous ?
Regardons quelles sont les paires les plus fréquentes :
.. code-block:: scala
scala> val ord = Ordering.by[(Seq[String], Int), Int](_._2)
scala> cooccurs.top(10)(ord).foreach(println)
******************************************************
Construction d'un graphe de co-occurrences avec GraphX
******************************************************
Les outils standard d'analyse nous fournissent donc pour l'instant assez peu
d'informations sur les relations entre les mots-clefs. Les paires qui
apparaissent le plus (ou le moins fréquemment) sont souvent celles qui nous
importent le moins. On va donc maintenant changer de paradigme de penser et voir
les différents sujets comme les sommets d'un graphe, et l'existence d'un article
comportant deux sujets nous permettra d'établir une arête entre ces deux sujets.
On pourra ainsi calculer des métriques orientées graphes et mieux comprendre la
structure du réseau.
On utilise pour cela GraphX, qui requiert l'emploi d'un `Long` pour identifier
chacun des sommets de notre graphe. Comme nous disposons pour le moment
seulement de chaînes de caractères, une façon simple d'obtenir une valeur unique
pour chacune est d'utiliser une `fonction de hachage
`_ (ce qui présente
l'avantage de pouvoir être fait en distribué).
On travaille avec un graphe relativement petit pour le moment (environ 13 000
sujets), donc la probabilité de collision est encore assez faible avec 32 bits
(mais elle est de 70% pour 100 000 noeuds). On va donc utiliser du 64 bits, en
important une librairie fournie par Google :
.. code-block:: scala
import com.google.common.hash.Hashing
def hashId(str: String) = {
Hashing.md5().hashString(str).asLong()
}
On crée les listes d'identifiants et on vérifie l'unicité pour être sûrs de
l'absence de collisions.
.. code-block:: scala
scala> val vertices = topics.map(topic => (hashId(topic), topic))
scala> vertices.take(5)
scala> val uniqueHashes = vertices.map(_._1).countByValue()
scala> val uniqueTopics = vertices.map(_._2).countByValue()
scala> uniqueHashes.size == uniqueTopics.size
On peut ensuite créer le graphe :
.. code-block:: scala
import org.apache.spark.graphx._
val edges = cooccurs.map(p => { val (topics, cnt) = p
val ids = topics.map(hashId).sorted
Edge(ids(0), ids(1), cnt)
})
val topicGraph = Graph(vertices, edges)
topicGraph.cache()
L'API de GraphX s'est occupée de convertir les RDD standards en VertexRDD et
EdgeRDD nécessaires, sans que l'on ait besoin d'enlever les doublons.
Vous pouvez regarder :
.. code-block:: scala
scala> topicGraph.vertices.count()
*********************************
Comprendre la structure du graphe
*********************************
Composantes connexes
--------------------
Une des choses éléentaires à regarder sur un graphe pour commencer d'en
comprendre la structure est de regarder sa `connexité`. Dans un graphe connexe,
il est possible d'atteindre un sommet depuis n'importe quel autre en suivant un
`chemin` (une séquence de sommets). Si le graphe n'est pas connexe, il est
divisé en plusieurs `composantes connexes`, des sous-parties du graphes qui sont
connexes, que l'on peut examiner individuellement. La connexité étant une
propriété importante, GraphX fournit une méthode directement.
.. code-block:: scala
scala> val connectedComponentGraph: Graph[VertexId, Int] = topicGraph.connectedComponents()
Les objets retournés ici sont de type `Graph`.
Voici une méthode pour les trier par taille.
.. code-block:: scala
def sortedConnectedComponents(connectedComponents: Graph[VertexId, _]): Seq[(VertexId, Long)] = {
val componentCounts = connectedComponents.vertices.map(_._2).countByValue
componentCounts.toSeq.sortBy(_._2).reverse
}
On peut maintenant examiner notre graphe et ses composantes connexes, d'abord
leur nombre puis la taille des 10 plus grandes.
.. code-block:: scala
scala> val componentCounts = sortedConnectedComponents( connectedComponentGraph)
scala> componentCounts.size
scala> componentCounts.take(10).foreach(println)
.. admonition:: Question :
Qu'observez-vous ? Qu'en concluez-vous ?
.. ifconfig:: tpscala in ('public')
.. admonition:: Correction :
On observe une composante connexe géante. Ce serait bien de comprendre
pourquoi il y a des composantes connexes non jointes. On essaie avec ce
qui suit.
.. code-block:: scala
val nameCID = topicGraph.vertices.innerJoin(connectedComponentGraph.vertices) {
(topicId, name, componentId) => (name, componentId)
}
val c1 = nameCID.filter(x => x._2._2 == topComponentCounts(1)._2)
c1.collect().foreach(x => println(x._2._1))
val hiv = topics.filter(_.contains("HIV")).countByValue()
Avec l'ajout d'un grand nombre d'articles à la base, le graphe des sujets
devient peu à peu connexe. Il n'y a pas de raison structurelle d'avoir des
composantes connexes distinctes, ce sont essentiellement des artefacts liés
à l'étiquetage/la saisie des informations.
Distribution de degrés
----------------------
Un graphe connecté peut prendre des formes multiples (étoile, boucle géante).
On essaie d'aller plus loin sur la compréhension du réseau de `tags`, en
examinant la distribution de degrés, c'est-à-dire le nombre de sommets qui sont
de degré :math:`n` pour :math:`n` entier.
.. code-block:: scala
scala> val degrees: VertexRDD[Int] = topicGraph.degrees.cache()
scala> degrees.map(_._2).stats()
Commentez les résultats.
.. admonition:: Question :
Il y a une différence entre le `count` obtenu ici et le nombre de sommets
calculé auparavant, pourquoi ?
.. ifconfig:: tpscala in ('public')
.. admonition:: Correction :
Réponse : certains nœuds ne sont pas connectés au reste, ils n'ont qu'un
seul sujet majeur, donc pas de co-occurrence avec d'autres.
.. code-block:: scala
val sing = medline.filter(x => x.size == 1)
sing.count()
...
48611
val singTopic = sing.flatMap(topic => topic).distinct()
singTopic.count()
8084
val topic2 = topicPairs.flatMap(p => p)
singTopic.subtract(topic2).count()
...
969
On retrouve 969 + 12065 = le nombre initial.
Il y a des noeuds de fort degrés, on se donne la méthode suivante pour obtenir
leurs noms.
.. code-block:: scala
def topNamesAndDegrees(degrees: VertexRDD[Int], topicGraph: Graph[String, Int]): Array[(String, Int)] = {
val namesAndDegrees = degrees.innerJoin(topicGraph.vertices) {
(topicId, degree, name) => (name, degree)
}
val ord = Ordering.by[(String, Int), Int](_._2)
namesAndDegrees.map(_._2).top(10)(ord)
}
On utilise cette méthode sur nos données :
.. code-block:: scala
scala> topNamesAndDegrees(degrees, topicGraph).foreach(println)
On retrouve les termes les plus fréquents, ils sont aussi les plus connectés. On
pourrait ensuite filtrer ces noeuds de très fort degré (connectés à tout le
monde, ils n'apportent que peu d'information), et observer si la structure du
graphe reste la même, si la composante connexe géante se désagrège petit à
petit, ou s'il y a un moment où l'on a enlever suffisamment de noeuds et que la
structure actuelle disparaît.
.. warning::
Ce sujet de TP se prolonge dans la séance :ref:`Travaux pratiques - Fouille de réseaux sociaux, deuxième partie `.
*************
Bibliographie
*************
Karau, H., A. Konwinski, P. Wendell et M. Zaharia, *Learning Spark*, O'Reilly, 2015.
Ryza, S., U. Laserson, S. Owen and J. Wills, *Advanced Analytics with Spark*, O'Reilly, 2010.
Large-Scale Structure of a Network of Co-Occurring MeSH Terms: Statistical
Analysis of Macroscopic Properties, by Kastrin et al. (2014),