Travaux pratiques - Fouille de réseaux sociaux, troisième partie

Références externes utiles :

GraphX avancé

Dans ce TP, nous allons faire de la fouille de données appliquée à des données statistiques sur les matchs de basket-ball du championnat universitaire américain (NCAA). Comme pour la plupart des sports américains, le basket est très observé et les phases de jeu sont décomposées, de façon à créer des statistiques. Celles-ci sont abondamment analysées, et l’on utilise même l’acronyme « APBRmetrics » (Association for Professional Basketball Research) pour désigner ces études.

Nous allons mettre en œuvre GraphX pour constituer un graphe d’équipes. Puis, en faisant passer des messages et en agrégeant les résultats, nous obtiendrons les statistiques dont nous souhaitons disposer.

Les données

Voici une archive contenant les données. Nous allons cette fois charger des données csv. Il y aura deux fichiers :

  • un fichier teams.csv, contenant les informations sur les équipes. Il ne contient que deux colonnes, l’une pour l’identifiant numérique d’une équipe, le second pour son nom.

  • un fichier stats.csv contenant les résultats des matchs et les statistiques associées. Il contient une ligne pour chaque match de la saison. Beaucoup de colonnes commencent par « w » (pour désigner l’équipe qui a gagné, winning), beaucoup d’autres par « l » (pour l’équipe qui a perdu, losing).

Voici la liste des colonnes du fichier stats.csv:

  • season : année (2015), constante dans nos données

  • daynum : jour du match dans la saison, depuis le début de celle-ci

  • wteam : équipe gagnante

  • wscore : score des gagnants

  • lteam : équipe perdante

  • lscore : score des perdants

  • wloc : est-ce que l’équipe gagnante jouait à domicile (H), à l’extérieur (A), sur terrain neutre (N)

  • numot : nombre de périodes de prolongations

  • wfgm : lancers réussis (équipe gagnante)

  • wfga : lancers tentés (équipe gagnante)

  • wfgm3 : lancers à 3 points réussis (équipe gagnante)

  • wfga3 : lancers à 3 points tentés (équipe gagnante)

  • wftm : lancers francs réussis (équipe gagnante)

  • wfta : lancers francs tentés (équipe gagnante)

  • wor : rebonds offensifs (équipe gagnante)

  • wdr : rebonds défensifs (équipe gagnante)

  • wast : passes (équipe gagnante)

  • wto : pertes de balle (équipe gagnante)

  • wstl : récupérations de balle (équipe gagnante)

  • wblk : blocs (équipe gagnante)

  • wpf : fautes personnelles (équipe gagnante)

  • lfgm : lancers réussis (équipe perdante)

  • lfga : lancers tentés (équipe perdante)

  • lfgm3 : lancers à 3 points réussis (équipe perdante)

  • lfga3 : lancers à 3 points tentés (équipe perdante)

  • lftm : lancers francs réussis (équipe perdante)

  • lfta : lancers francs tentés (équipe perdante)

  • lor : rebonds offensifs (équipe perdante)

  • ldr : rebonds défensifs (équipe perdante)

  • last : passes (équipe perdante)

  • lto : pertes de balle (équipe perdante)

  • lstl : récupérations de balle (équipe perdante)

  • lblk : blocs (équipe perdante)

  • lpf : fautes personnelles (équipe perdante)

Le score d’une équipe est donc (le nombre de lancers à 3 pts est une partie du nombre total de lancers) : s = wfgm * 2 + wfgm3 + wftm

On va télécharger les données, puis les décompresser dans un dossier :

mkdir ~/tpbasket && cd ~/tpbasket
wget http://cedric.cnam.fr/vertigo/Cours/RCP216/docs/tpbasket.zip
unzip tpbasket.zip
ls # pour vérifier la présence des deux fichiers csv attendus

Création du graphe

On commence par lancer spark, puis faire l’import des RDD et de GraphX.

spark-shell
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx._

On va commencer par créer une classe GameStats, permettant de stocker les statistiques d’une équipe pour un match donné :

case class GameStats(
   val score: Int,
   val fieldGoalMade:   Int,
   val fieldGoalAttempt: Int,
   val threePointerMade: Int,
   val threePointerAttempt: Int,
   val threeThrowsMade: Int,
   val threeThrowsAttempt: Int,
   val offensiveRebound: Int,
   val defensiveRebound: Int,
   val assist: Int,
   val turnOver: Int,
   val steal: Int,
   val block: Int,
   val personalFoul: Int
) {
// Quelques pourcentages usuels
def fgPercent: Double = 100.0 * fieldGoalMade / fieldGoalAttempt
def tpPercent: Double = 100.0 * threePointerMade / threePointerAttempt
def ftPercent: Double = 100.0 * threeThrowsMade / threeThrowsAttempt
override def toString: String = "Score: " + score
}

Et on va créer aussi deux classes, pour les résultats des matchs :

abstract class GameResult(val season:Int, val day:Int, val loc:String) extends Serializable

case class FullResult(
    override val season:    Int,
    override val day:       Int,
    override val loc:       String,
    val winnerStats:       GameStats,
    val loserStats:         GameStats
) extends GameResult(season, day, loc)

Nous pouvons ensuite préparer la création d’un graphe contenant les résultats des équipes lors des matchs de la saison. Les nœuds du graphe sont les équipes, et chaque arête correspond à un match.

On crée donc le RDD des sommets, teams:

val teams: RDD[(VertexId, String)] =
sc.textFile("/data/teams.csv").
filter(! _.startsWith("#")).
map {line =>
  val row = line split ','
    (row(0).toInt, row(1))
}

Comme d’habitude, vérifiez le contenu du RDD :

teams.take(3).foreach{println}

On crée ensuite le RDD des arêtes, avec les résultats depuis stats.csv. Chaque arête contient les identifiants des deux extrémités (l’id de l’équipe gagnante et celui de l’équipe perdante), puis un FullResults avec toutes les statistiques de la rencontre.

val detailedStats: RDD[Edge[FullResult]] =
sc.textFile("/data/stats.csv").
filter(! _.startsWith("#")).
map {line =>
  val row = line split ','
    Edge(row(2).toInt, row(4).toInt,
        FullResult(
          row(0).toInt, row(1).toInt,
          row(6),
          GameStats(
            score = row(3).toInt,
            fieldGoalMade = row(8).toInt,
            fieldGoalAttempt = row(9).toInt,
            threePointerMade = row(10).toInt,
            threePointerAttempt = row(11).toInt,
            threeThrowsMade = row(12).toInt,
            threeThrowsAttempt = row(13).toInt,
            offensiveRebound = row(14).toInt,
            defensiveRebound = row(15).toInt,
            assist = row(16).toInt,
            turnOver = row(17).toInt,
            steal = row(18).toInt,
            block = row(19).toInt,
            personalFoul = row(20).toInt
            ),
          GameStats(
            score = row(5).toInt,
            fieldGoalMade = row(21).toInt,
            fieldGoalAttempt = row(22).toInt,
            threePointerMade = row(23).toInt,
            threePointerAttempt = row(24).toInt,
            threeThrowsMade = row(25).toInt,
            threeThrowsAttempt = row(26).toInt,
            offensiveRebound = row(27).toInt,
            defensiveRebound = row(28).toInt,
            assist = row(20).toInt,
            turnOver = row(30).toInt,
            steal = row(31).toInt,
            block = row(32).toInt,
            personalFoul = row(33).toInt
            )
            )
            )
}

Vérifiez que tout s’est bien passé, puis créez le graphe :

detailedStats.take(3).foreach(println)
val scoreGraph = Graph(teams, detailedStats)

Question :

Remarquez la construction particulière du graphe (sens des arêtes). Quel bénéfice immédiat peut-on en tirer pour l’analyse ? Essayez par exemple avec « Duke ».

Question :

Avec ce qui a été vu dans les TP précédents, calculez quelques statistiques sur ce graphe.

Statistiques des vainqueurs

On commence par une statistique simple : quelle est la moyenne des lancers (dans le jeu, donc hors lancers francs) marqués par les équipes qui ont remporté les matchs ?

Pour cela, il nous faut, pour chaque équipe, le nombre de matchs gagnés et le nombre de lancers que ses joueurs avaient tenté.

On va effectuer le calcul avec l’opérateur aggregateMessages de GraphX, qui repose sur l’envoi et la réception de message. On utilise d’abord une fonction SendMsg, qui se charge d’envoyer des messages. Elle utilise l’attribut de l’arête, ainsi que ceux des sommets aux extrémités. Elle peut passer des messages vers la destination (sendToDst) ou vers la source (sendToSrc). Une deuxième fonction, mergeMsg, agrège les messages deux à deux, avec un autre Message en sortie (cf Map/Reduce). La sortie d”aggregateMessages est un VertexRDD[Msg].

type Msg = (Int, Int)
type Context = EdgeContext[String, FullResult, Msg]
val winningFieldGoalMade: VertexRDD[Msg] = scoreGraph.aggregateMessages(
   // sendMsg
   (ec: Context) => ec.sendToSrc(1, ec.attr.winnerStats.fieldGoalMade),
   // mergeMsg
   (x: Msg, y: Msg) => (x._1 + y._1, x._2+ y._2)
)

On peut ensuite calculer la statistique qui nous intéresse :

val avgWinningFieldGoalMade: VertexRDD[Double] =
   winningFieldGoalMade mapValues (
       (id: VertexId, x: Msg) => x match {
           case (count: Int, total: Int) => total.toDouble/count
})

avgWinningFieldGoalMade.take(5).foreach(println)
// top 5 trié
avgWinningFieldGoalMade.take(5).toSeq.sortBy(_._2).reverse.foreach(println)

Question :

Refaites la même chose pour le nombre moyen de points marqués par les équipes victorieuses.

D’autres statistiques de victoire

Pour permettre de développer facilement de nombreux calculs sur le graphe, on va utiliser des fonctions Scala de haut niveau, afin d’avoir un niveau d’abstraction plus élevé.

On va voir cela avec l’exemple des nombres de lancers à 3 points et du pourcentage moyen de réussite à 3 points. On crée des fonctions qui ont pour signature GameStats => Double.

def threePointMade(stats: GameStats) = stats.threePointerMade
def threePointPercent(stats: GameStats) = stats.tpPercent

Ces fonctions sont réutilisées dans cette fonction générique, qui prend en paramètre l’une de ces fonctions, et permet le calcul de la statistique concernée :

def averageWinnerStat(graph: Graph[String, FullResult])(getStat:
GameStats => Double): VertexRDD[Double] = {
   type Msg = (Int, Double)
   val winningScore: VertexRDD[Msg] =
   graph.aggregateMessages[Msg](
       // sendMsg
       triplet => triplet.sendToSrc(1, getStat(triplet.attr.winnerStats)),
       // mergeMsg
       (x, y) => (x._1 + y._1, x._2+ y._2)
   )
   winningScore mapValues (
       (id: VertexId, x: Msg) => x match {
           case (count: Int, total: Double) => total/count
       })
}

On peut ainsi calculer facilement ce que l’on cherchait, et trouver quelles équipes ont été les meilleures sur chaque aspect durant la saison :

val winnersThreePointMade = averageWinnerStat(scoreGraph)(threePointMade)
val winnersThreePointPercent = averageWinnerStat(scoreGraph)(threePointPercent)
winnersThreePointMade.sortBy(_._2,false).take(5).foreach(println)
winnersThreePointPercent.sortBy(_._2,false).take(5).foreach(println)

Statistiques sur tous les matchs

On va maintenant agréger l’information pour tous les matchs, et plus seulement les victoires de chaque équipe. Pour rendre notre abstraction encore plus flexible, on va modifier notre fonction averageWinnerStat, en une averageStat qui prendra en une classe en paramètre afin de préciser les nœuds que l’on souhaite agréger dans notre calcul. Voici des types supplémentaires, et la nouvelle fonction :

trait Teams
case class Winners() extends Teams
case class Losers() extends Teams
case class AllTeams() extends Teams

def averageStat(graph: Graph[String, FullResult])(getStat: GameStats => Double, tms: Teams): VertexRDD[Double] = {
   type Msg = (Int, Double)
   val aggrStats: VertexRDD[Msg] = graph.aggregateMessages[Msg](
       // sendMsg
       tms match {
           case _ : Winners => t => t.sendToSrc((1, getStat(t.attr.winnerStats)))
           case _ : Losers => t => t.sendToDst((1, getStat(t.attr.loserStats)))
           case _       => t => {
               t.sendToSrc((1, getStat(t.attr.winnerStats)))
               t.sendToDst((1, getStat(t.attr.loserStats)))
           }
       }
       ,
       // mergeMsg
       (x, y) => (x._1 + y._1, x._2+ y._2)
   )
   aggrStats mapValues (
       (id: VertexId, x: Msg) => x match {
           case (count: Int, total: Double) => total/count
           })
   }

Comme précédemment, on peut utiliser cette fonction pour calculer les nombres de lancers à 3 points de chaque équipe, ainsi que les pourcentages associés :

val allThreePointMade = averageStat(scoreGraph)(threePointMade(_), AllTeams())
allThreePointMade.sortBy(_._2, false).take(5).foreach(println)
val allThreePointPercent = averageStat(scoreGraph)(threePointPercent(_), AllTeams())
allThreePointPercent.sortBy(_._2,false).take(5).foreach(println)

On peut bien sûr utiliser averageStat pour calculer les nombres moyens de points pour les équipes victorieuses. Regardons notamment les équipes avec le plus de points en moyenne et le moins de points en moyenne :

val winnerAvgPPG = averageStat(scoreGraph)(_.score, Winners())
winnerAvgPPG.max()(Ordering.by(_._2))
winnerAvgPPG.min()(Ordering.by(_._2))

Question :

Que concluez-vous ? (conclusion « sportive »)

Regardons maintenant la moyenne des points par match, en incluant victoires et défaites :

val allAvgPPG = averageStat(scoreGraph)(_.score, AllTeams())
allAvgPPG.max()(Ordering.by(_._2))
allAvgPPG.min()(Ordering.by(_._2))

Question :

Que concluez-vous ? (conclusion « sportive »)

Défense

Après des calculs sur ce que les équipes marquent, regardons quelques statistiques sur ce qu’elles laissent leurs adversaires marquer. On crée une fonction averageConcededStat, qui envoie les loserStats à l’équipe gagnante et envoie les winnerStats à l’équipe perdante.

On en profite pour ajouter un détail : le nom des équipes dans les messages (on aurait pu le faire auparavant, bien sûr).

Question :

En quoi cela diffère-t-il de ce qu’on a vu précédemment ? Lisez bien le code de la fonction.

def averageConcededStat(graph: Graph[String, FullResult])(getStat:
GameStats => Double, rxs: Teams): VertexRDD[(String, Double)] = {
   type Msg = (Int, Double, String)
   val aggrStats: VertexRDD[Msg] = graph.aggregateMessages[Msg](
       // sendMsg
       rxs match {
           case _ : Winners => t => t.sendToSrc((1,
           getStat(t.attr.loserStats), t.srcAttr))
           case _ : Losers => t => t.sendToDst((1,
           getStat(t.attr.winnerStats), t.dstAttr))
           case _       => t => {
               t.sendToSrc((1,
               getStat(t.attr.loserStats),t.srcAttr))
               t.sendToDst((1,
               getStat(t.attr.winnerStats),t.dstAttr))
           }
       }
       ,
       // mergeMsg
       (x, y) => (x._1 + y._1, x._2+ y._2, x._3)
   )
   aggrStats mapValues (
       (id: VertexId, x: Msg) => x match {
           case (count: Int, total: Double, name: String) =>
           (name, total/count)
       })
}

Et l’on peut calculer les nombres de points concédés, par les vainqueurs ou les perdants :

val winnersAvgConcededPoints = averageConcededStat(scoreGraph)(_.score, Winners())
val losersAvgConcededPoints = averageConcededStat(scoreGraph)(_.score, Losers())

losersAvgConcededPoints.min()(Ordering.by(_._2))
winnersAvgConcededPoints.min()(Ordering.by(_._2))
losersAvgConcededPoints.max()(Ordering.by(_._2))
winnersAvgConcededPoints.max()(Ordering.by(_._2))

Question :

Que concluez-vous ? (conclusion « sportive »)

Agrégation complète

On a vu jusqu’à présent à quel point aggregateMessages était flexible et puissant : on peut définir les messages que l’on passe, sélectionner les nœuds qui reçoivent les messages et finalement définir comment l’on agrége ces messages. Nous allons maintenant agréger plusieurs statistiques et les stocker dans les nœuds du graphe.

On crée d’abord une classe dédiée aux statistiques d’une équipe :

// Average Stats of All Teams
case class TeamStat(
       wins: Int = 0     // Number of wins
     ,losses: Int = 0     // Number of losses
       ,ppg: Int = 0     // Points per game
      ,pcg: Int = 0     // Points conceded per game
       ,fgp: Double = 0   // Field goal percentage
       ,tpp: Double = 0   // Three point percentage
       ,ftp: Double = 0   // Free Throw percentage
     ){
   override def toString = wins + "-" + losses
}

Maintenant, nous allons collecter les statistiques de chaque équipe avec aggregateMessages, en recourant à un message qui va conserver 8 informations, relatives aux matchs gagnés, perdus et les teamStat vues auparavant :

type Msg = (Int, Int, Int, Int, Int, Double, Double, Double)
val aggrStats: VertexRDD[Msg] = scoreGraph.aggregateMessages(
       // sendMsg
       t => {
               t.sendToSrc((   1,
                              1, 0,
                               t.attr.winnerStats.score,
                               t.attr.loserStats.score,
                               t.attr.winnerStats.fgPercent,
                               t.attr.winnerStats.tpPercent,
                               t.attr.winnerStats.ftPercent
                           ))
               t.sendToDst((   1,
                               0, 1,
                               t.attr.loserStats.score,
                              t.attr.winnerStats.score,
                               t.attr.loserStats.fgPercent,
                               t.attr.loserStats.tpPercent,
                               t.attr.loserStats.ftPercent
                           ))
           }
       ,
       // mergeMsg
       (x, y) => ( x._1 + y._1, x._2 + y._2,
                   x._3 + y._3, x._4 + y._4,
                   x._5 + y._5, x._6 + y._6,
                   x._7 + y._7, x._8 + y._8
               )
   )

On agrège ces aggrStats et l’on mappe vers une collection de TeamStat :

val teamStats: VertexRDD[TeamStat] = aggrStats mapValues {
       (id: VertexId, m: Msg) => m match {
           case ( count: Int,
                   wins: Int,
                losses: Int,
                 totPts: Int,
             totConcPts: Int,
                   totFG: Double,
                   totTP: Double,
                   totFT: Double) => TeamStat( wins, losses,
                                              totPts/count,
                                               totConcPts/count,
                                               totFG/count,
                                               totTP/count,
                                               totFT/count)
       }
}

On change maintenant le type des nœuds du graphe, toujours avec des équipes mais disposant de notre nouvel attribut TeamStat:

case class Team(name: String, stats: Option[TeamStat]) {
   override def toString = name + ": " + stats
}

Et l’on fusionne les statistiques agrégées dans les attributs des sommets :

// Joining the average stats to vertex attributes
def addTeamStat(id: VertexId, t: Team, stats: TeamStat) =
Team(t.name, Some(stats))
val statsGraph: Graph[Team, FullResult] =
   scoreGraph.mapVertices((_, name) => Team(name, None)).
               joinVertices(teamStats)(addTeamStat)

On peut vérifier ce que contiennent les nouveaux nœuds :

statsGraph.vertices.take(3).foreach(println)

Et l’on va terminer en cherchant à calculer le top10 des meilleures équipes de la saison régulière, c’est-à-dire celles qui disposent de la meilleure différence entre leur nombre de victoire et leur nombre de défaites :

import scala.math.Ordering
import scala.reflect.classTag
import scala.reflect.ClassTag

object winsOrdering extends Ordering[Option[TeamStat]] {
   def compare(x: Option[TeamStat], y: Option[TeamStat]) = (x, y)
   match {
       case (None, None)       => 0
       case (Some(a), None)   => 1
       case (None, Some(b))   => -1
       case (Some(a), Some(b)) => if (a.wins == b.wins) a.losses compare b.losses
                                   else a.wins compare b.wins
   }}

statsGraph.vertices.sortBy(v => v._2.stats,false)(winsOrdering, classTag[Option[TeamStat]]).take(10).foreach(println)

Bibliographie

  • Rindra Ramamonjison, Apache Spark Graph Processing, Packt Publishing, 2015.

  • Karau, H., A. Konwinski, P. Wendell et M. Zaharia, Learning Spark, O’Reilly, 2015.