Création du point d’entrée sur le serveur de calcul¶

In [ ]:
from pyspark.sql import SparkSession
spark  = SparkSession.builder.master("local[4]").getOrCreate()

Création des différents schémas¶

Pas forcement utile dans notre cas étant donné que les datasets ne sont pas très grands.

L'utilisation de inferSchema=true aurait suffit mais risque de prends plus de temps et n'est pas forcément fiable

In [ ]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType

schema_caracteristiques = StructType([
    StructField("Num_Acc", LongType()),
    StructField("an", IntegerType()),
    StructField("mois", IntegerType()),
    StructField("jour", IntegerType()),
    StructField("hrmn", IntegerType()),
    StructField("lum", IntegerType()),
    StructField("agg", IntegerType()),
    StructField("int", IntegerType()),
    StructField("atm", IntegerType()),
    StructField("col", IntegerType()),
    StructField("com", IntegerType()),
    #StructField("adr", StringType()),
    StructField("gps", StringType()),
    StructField("lat", IntegerType()),
    StructField("long", IntegerType()),
    StructField("dep", IntegerType()),
])

schema_vehicules = StructType([
    StructField("Num_Acc", LongType()),
    StructField("senc", IntegerType()),
    StructField("catv", IntegerType()),
    StructField("occutc", IntegerType()),
    StructField("obs", IntegerType()),
    StructField("obsm", IntegerType()),
    StructField("choc", IntegerType()),
    StructField("manv", IntegerType()),
    StructField("num_veh", StringType()),
])

schema_usagers = StructType([
    StructField("Num_Acc", LongType()),
    StructField("place", IntegerType()),
    StructField("catu", IntegerType()),
    StructField("grav", IntegerType()),
    StructField("sexe", IntegerType()),
    StructField("trajet", IntegerType()),
    StructField("secu", StringType()),
    StructField("locp", IntegerType()),
    StructField("actp", IntegerType()),
    StructField("etatp", IntegerType()),
    StructField("an_nais", IntegerType()),
    StructField("num_veh", StringType()),
])

schema_lieux = StructType([
    StructField("Num_Acc", LongType()),
    StructField("catr", IntegerType()),
    StructField("voie", StringType()),
    StructField("v1", IntegerType()),
    StructField("v2", StringType()),
    StructField("circ", IntegerType()),
    StructField("nbv", IntegerType()),
    StructField("pr", IntegerType()),
    StructField("pr1", IntegerType()),
    StructField("vosp", IntegerType()),
    StructField("prof", IntegerType()),
    StructField("plan", IntegerType()),
    StructField("lartpc", IntegerType()),
    StructField("larrout", IntegerType()),
    StructField("surf", IntegerType()),
    StructField("infra", IntegerType()),
    StructField("situ", IntegerType()),
    StructField("env1", IntegerType()),
])

Création des dataframes¶

In [ ]:
# On supprime la colonne adr contenant des chaînes de caractères avec un encodage pouvant poser problème
caracteristiques = spark.read.options(header=True, delimiter=",").schema(schema_caracteristiques).csv("./accidents/carac.txt")
#caracteristiques = caracteristiques.drop('adr')

vehicules = spark.read.options(header=True, delimiter=",").schema(schema_vehicules).csv("./accidents/vehicules*.csv")

usagers = spark.read.options(header=True, delimiter=",").schema(schema_usagers).csv("./accidents/usagers*.csv")

lieux = spark.read.options(header=True, delimiter=",").schema(schema_lieux).csv("./accidents/lieux*.csv")

Transformation des dataframes en rdd¶

In [ ]:
caracteristiques_rdd = caracteristiques.rdd
vehicules_rdd = vehicules.rdd
usagers_rdd = usagers.rdd
lieux_rdd = lieux.rdd

Affichages des tableaux¶

Caractéristiques¶

In [ ]:
caracteristiques.show(truncate=False, n=5)
print(f"shape: {caracteristiques.count()} x {len(caracteristiques.columns)}")
+------------+---+----+----+----+---+---+---+---+---+---+---+-------+------+---+
|Num_Acc     |an |mois|jour|hrmn|lum|agg|int|atm|col|com|gps|lat    |long  |dep|
+------------+---+----+----+----+---+---+---+---+---+---+---+-------+------+---+
|201100000001|11 |1   |22  |1400|1  |2  |1  |1  |3  |5  |M  |5053589|295262|590|
|201100000002|11 |6   |24  |1500|1  |1  |1  |1  |3  |11 |M  |5051652|293898|590|
|201100000003|11 |9   |16  |645 |2  |2  |1  |1  |6  |52 |M  |5051080|290322|590|
|201100000004|11 |9   |22  |1515|1  |2  |1  |1  |5  |11 |M  |5051861|293043|590|
|201100000005|11 |10  |24  |1545|1  |2  |1  |1  |6  |11 |M  |5052506|293541|590|
+------------+---+----+----+----+---+---+---+---+---+---+---+-------+------+---+
only showing top 5 rows

shape: 484045 x 15

Véhicules¶

In [ ]:
vehicules.show(truncate=False, n=5)
+------------+----+----+------+---+----+----+----+-------+
|Num_Acc     |senc|catv|occutc|obs|obsm|choc|manv|num_veh|
+------------+----+----+------+---+----+----+----+-------+
|201100000001|0   |2   |0     |0  |2   |1   |17  |A01    |
|201100000001|0   |7   |0     |0  |0   |6   |15  |B02    |
|201100000002|0   |10  |0     |0  |2   |2   |10  |A01    |
|201100000002|0   |33  |0     |0  |2   |1   |1   |B02    |
|201100000003|0   |7   |0     |0  |1   |3   |1   |A01    |
+------------+----+----+------+---+----+----+----+-------+
only showing top 5 rows

Usagers¶

In [ ]:
usagers.show(truncate=False, n=5)
print(f"shape: {usagers.count()} x {len(usagers.columns)}")
+------------+-----+----+----+----+------+----+----+----+-----+-------+-------+
|Num_Acc     |place|catu|grav|sexe|trajet|secu|locp|actp|etatp|an_nais|num_veh|
+------------+-----+----+----+----+------+----+----+----+-----+-------+-------+
|201100000001|1    |1   |3   |1   |5     |21  |0   |0   |0    |1995   |A01    |
|201100000001|1    |1   |1   |1   |5     |11  |0   |0   |0    |1949   |B02    |
|201100000002|1    |1   |1   |1   |0     |11  |0   |0   |0    |1967   |A01    |
|201100000002|1    |1   |3   |1   |0     |21  |0   |0   |0    |1963   |B02    |
|201100000003|1    |1   |1   |1   |1     |11  |0   |0   |0    |1989   |A01    |
+------------+-----+----+----+----+------+----+----+----+-----+-------+-------+
only showing top 5 rows

shape: 1078041 x 12

Lieux¶

In [ ]:
lieux.show(truncate=False, n=5)
+------------+----+----+----+----+----+---+---+---+----+----+----+------+-------+----+-----+----+----+
|Num_Acc     |catr|voie|v1  |v2  |circ|nbv|pr |pr1|vosp|prof|plan|lartpc|larrout|surf|infra|situ|env1|
+------------+----+----+----+----+----+---+---+---+----+----+----+------+-------+----+-----+----+----+
|201100000001|3   |39  |null|null|2   |2  |5  |535|0   |1   |1   |0     |60     |1   |0    |1   |0   |
|201100000002|3   |41  |null|B   |2   |0  |0  |700|0   |0   |0   |0     |0      |0   |0    |0   |0   |
|201100000003|3   |39  |null|null|2   |2  |10 |600|0   |1   |1   |0     |60     |1   |0    |1   |99  |
|201100000004|3   |39  |null|null|2   |0  |8  |400|0   |1   |1   |0     |58     |1   |0    |1   |0   |
|201100000005|3   |39  |null|null|2   |2  |7  |450|0   |1   |1   |0     |0      |1   |0    |1   |3   |
+------------+----+----+----+----+----+---+---+---+----+----+----+------+-------+----+-----+----+----+
only showing top 5 rows

Des informations pertinentes ?¶

Répartition des accidents en et hors agglomération¶

In [ ]:
import matplotlib.pyplot as plt

k = caracteristiques_rdd.map(lambda x: ("Agglo" if x.agg-1 else "Hors Agglo", 1)).reduceByKey(lambda x, y: x + y).collect()

labels = [x[0] for x in k]
sizes = [x[1] for x in k]
explode = (0, 0.1)  # permet de faire ressortir la part "Autres"

fig1, ax1 = plt.subplots()
ax1.pie(sizes, explode=explode, labels=labels, autopct='%1.1f%%', shadow=True, startangle=90)
ax1.axis('equal')
ax1.title.set_text("Répartition des accidents survenus en et hors agglomération")

plt.show()
                                                                                

On observe qu'une grande majorité d'accidents à lieu en agglomération

Comparons les taux de mortalité des accidents en et hors agglomération¶

In [ ]:
# comparaison des taux de mortalité entre les accidents survenus en agglomération et hors agglomération

# on ne garde que les accidents mortels
usagers_morts = usagers_rdd.filter(lambda x: x.grav == 2)

# accidents mortels en agglomération
usagers_morts_horsagglo = usagers_morts.join(caracteristiques_rdd.filter(lambda x: x.agg == 1))

# accidents mortels en agglomération
usagers_morts_agglo = usagers_morts.join(caracteristiques_rdd.filter(lambda x: x.agg == 2))


labels = "Accidents mortels en agglomération", "Accidents mortels hors agglomération"
sizes = usagers_morts_agglo.count(), usagers_morts_horsagglo.count()
explode = (0, 0.1)  # permet de faire ressortir la part 

fig1, ax1 = plt.subplots()
ax1.pie(sizes, explode=explode, labels=labels, autopct='%1.1f%%', shadow=True, startangle=90)
ax1.axis('equal')
ax1.title.set_text("Répartition des accidents mortels survenus en et hors agglomération")

plt.show()
                                                                                

Nombre d'accidents par département trié dans l'ordre décroissant¶

In [ ]:
# on veut afficher le top 5 des departements avec le plus d'accidents
# on commence par map pour avoir un tuple (dep, 1)
# puis reduceByKey pour compter le nombre d'accidents par departement
# ensuite sortBy pour trier les departements par ordre décroissant
# et enfin un take pour afficher les 5 premiers

caracteristiques_rdd.map(lambda x: (x.dep, 1)).reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], ascending=False).take(5)
                                                                                
Out[ ]:
[(750, 51653), (130, 31405), (930, 22592), (920, 21152), (940, 20289)]

On observe que le top cinq représente essentiellement Paris, excepté pour Marseille qui se place en seconde position

In [ ]:
# on commence par map pour avoir un tuple (paris, 1) ou (hors paris, 1)
# puis reduceByKey pour compter le nombre d'accidents par departement
# enfin collect pour afficher les résultats

k = caracteristiques_rdd.map(lambda x: ("Paris" if x.dep in [750, 940, 920, 930] else "Hors Paris", 1)).reduceByKey(lambda x, y: x + y).collect()
print(k)

labels = [x[0] for x in k]
sizes = [x[1] for x in k]
explode = (0, 0.1)  # permet de faire ressortir la part "Autres"

fig1, ax1 = plt.subplots()
ax1.pie(sizes, explode=explode, labels=labels, autopct='%1.1f%%', shadow=True, startangle=90)
ax1.axis('equal')
ax1.title.set_text("Répartition des accidents Paris vs Hors Paris entre 2011 et 2018")

plt.show()
[Stage 864:==============>                                          (1 + 3) / 4]
[('Hors Paris', 368359), ('Paris', 115686)]
                                                                                

On observe qu'environ 1 accidents sur 4 à lieu à Paris, verifions si c'est le cas tout les ans

In [ ]:
# create a subplot with 2 rows and 4 columns
fig, ax = plt.subplots(2, 4, figsize=(20, 10))

# on plot un camembert du nombre d'accidents à Paris et hors Paris pour chaque année entre 2011 et 2018
for i in range(11, 19):
    k = caracteristiques_rdd.filter(lambda x: x.an == i).map(lambda x: ("Paris" if x.dep in [750, 930, 940, 920] else "Hors Paris", 1)).reduceByKey(lambda x, y: x + y).collect()
    labels = k[0][0], k[1][0]
    sizes = k[0][1], k[1][1]
    explode = (0, 0.1)  # permet de faire ressortir la part "Autres"
    ax[int((i-11)/4)][(i-11)%4].pie(sizes, explode=explode, labels=labels, autopct='%1.1f%%', shadow=True, startangle=90)
    ax[int((i-11)/4)][(i-11)%4].axis('equal')
    ax[int((i-11)/4)][(i-11)%4].set_title(f"Accidents en {i+2000}")
    
                                                                                

les fichier geojson ont été récupéré sur https://github.com/gregoiredavid/france-geojson

In [ ]:
import geopandas as gpd
sf = gpd.read_file('geojson/departements.geojson')
In [ ]:
# on va maintenant afficher la répartition des accidents sur la carte de France
# on commence par créer un dictionnaire qui associe à chaque département le nombre d'accidents
# puis on crée une nouvelle colonne dans le dataframe sf qui contient le nombre d'accidents pour chaque département
# enfin on affiche la carte

#remove tout les accidents qui ne sont pas en France
k = caracteristiques_rdd.filter(lambda x: x.gps == "M")

accidents_par_dep = k.map(lambda x: ("2A" if x.dep == 201 else "2B" if x.dep == 202 else x.dep//10, 1)).reduceByKey(lambda x, y: x + y).map(lambda x: (str(x[0]).zfill(2) if isinstance(x[0], int) else x[0], x[1])).collectAsMap()

sf['nb_accidents'] = sf['code'].map(accidents_par_dep)
ax = sf.plot(column='nb_accidents', legend=True, figsize=(20, 10), cmap='OrRd', missing_kwds={'color': 'grey', "hatch": "///"})
ax.set_title("Répartition des accidents sur la carte de France")
# annote paris  bouche du rhone
ax.annotate('Paris', xy=(2.35, 48.85), xytext=(2.35, 48.85), color='black', fontsize=12)
ax.annotate('Bouches du Rhone', xy=(5.35, 43.3), xytext=(5.35, 43.3), color='black', fontsize=12)
                                                                                
Out[ ]:
Text(5.35, 43.3, 'Bouches du Rhone')
In [ ]:
# on va maintenant afficher la répartition normalisé des accidents mortels sur la carte de France afin de visualisé les departements les plus dangereux
# on commence par créer un dictionnaire qui associe à chaque département le nombre d'accidents mortels
# puis on crée une nouvelle colonne dans le dataframe sf qui contient le nombre d'accidents mortels pour chaque département
# enfin on affiche la carte

sf = gpd.read_file('geojson/departements.geojson')

# filter usagers df to keep only dead people
usagers_mort = usagers.filter(usagers.grav == 2)

usagers_implique = usagers.join(caracteristiques.filter(caracteristiques.gps == "M"), usagers.Num_Acc == caracteristiques.Num_Acc, 'inner').rdd

usagers_mort_carac = usagers_mort.join(caracteristiques.filter(caracteristiques.gps == "M"), usagers_mort.Num_Acc == caracteristiques.Num_Acc, 'inner').rdd

mort_par_dep = usagers_mort_carac.map(lambda x: ("2A" if x.dep == 201 else "2B" if x.dep == 202 else x.dep//10, 1)).reduceByKey(lambda x, y: x + y).map(lambda x: (str(x[0]).zfill(2) if isinstance(x[0], int) else x[0], x[1])).collectAsMap()
implique_par_dep = usagers_implique.map(lambda x: ("2A" if x.dep == 201 else "2B" if x.dep == 202 else x.dep//10, 1)).reduceByKey(lambda x, y: x + y).map(lambda x: (str(x[0]).zfill(2) if isinstance(x[0], int) else x[0], x[1])).collectAsMap()


sf['nb_morts'] = sf['code'].map(mort_par_dep)
sf['nm_implique'] = sf['code'].map(implique_par_dep)

sf['nb_morts_normalise'] = sf['nb_morts']/sf['nm_implique']

ax = sf.plot(column='nb_morts_normalise', legend=True, figsize=(20, 10), cmap='OrRd', missing_kwds={'color': 'grey', "hatch": "///"})
ax.set_title("La taux de mortalité par personne impliquées des accidents par département")

#sort sf by nb_morts_normalise
sf = sf.sort_values(by=['nb_morts_normalise'], ascending=False)
sf.head(10)
                                                                                
Out[ ]:
code nom geometry nb_morts nm_implique nb_morts_normalise
38 81 Tarn POLYGON ((1.99017 44.14945, 2.02477 44.15513, ... 215 2250 0.095556
6 39 Jura POLYGON ((5.51854 47.30418, 5.52327 47.30548, ... 192 2260 0.084956
30 48 Lozère POLYGON ((3.36135 44.97141, 3.37032 44.96998, ... 85 1055 0.080569
11 55 Meuse POLYGON ((4.95099 49.23687, 4.96403 49.24526, ... 111 1379 0.080493
27 24 Dordogne POLYGON ((0.62974 45.71457, 0.64015 45.69790, ... 287 3592 0.079900
65 84 Vaucluse MULTIPOLYGON (((4.88812 44.33169, 4.89533 44.3... 311 3901 0.079723
39 82 Tarn-et-Garonne POLYGON ((1.06408 44.37851, 1.08240 44.38141, ... 185 2351 0.078690
94 79 Deux-Sèvres POLYGON ((-0.89196 46.97582, -0.87973 46.97580... 224 2870 0.078049
7 40 Landes POLYGON ((-1.25389 44.46760, -1.19248 44.48121... 254 3336 0.076139
81 46 Lot POLYGON ((1.44826 45.01931, 1.46198 45.01370, ... 112 1501 0.074617
In [ ]:
# on va maintenant afficher la répartition normalisé des accidents mortels sur la carte de France afin de visualisé les departements les plus dangereux
# on commence par créer un dictionnaire qui associe à chaque département le nombre d'accidents mortels
# puis on crée une nouvelle colonne dans le dataframe sf qui contient le nombre d'accidents mortels pour chaque département
# enfin on affiche la carte

sf = gpd.read_file('geojson/departements.geojson')

# filter usagers df to keep only dead people
usagers_mort = usagers.filter(usagers.grav == 1)

usagers_implique = usagers.join(caracteristiques.filter(caracteristiques.gps == "M"), usagers.Num_Acc == caracteristiques.Num_Acc, 'inner').rdd

usagers_mort_carac = usagers_mort.join(caracteristiques.filter(caracteristiques.gps == "M"), usagers_mort.Num_Acc == caracteristiques.Num_Acc, 'inner').rdd

mort_par_dep = usagers_mort_carac.map(lambda x: ("2A" if x.dep == 201 else "2B" if x.dep == 202 else x.dep//10, 1)).reduceByKey(lambda x, y: x + y).map(lambda x: (str(x[0]).zfill(2) if isinstance(x[0], int) else x[0], x[1])).collectAsMap()
implique_par_dep = usagers_implique.map(lambda x: ("2A" if x.dep == 201 else "2B" if x.dep == 202 else x.dep//10, 1)).reduceByKey(lambda x, y: x + y).map(lambda x: (str(x[0]).zfill(2) if isinstance(x[0], int) else x[0], x[1])).collectAsMap()


sf['nb_morts'] = sf['code'].map(mort_par_dep)
sf['nm_implique'] = sf['code'].map(implique_par_dep)

sf['nb_morts_normalise'] = sf['nb_morts']/sf['nm_implique']

ax = sf.plot(column='nb_morts_normalise', legend=True, figsize=(20, 10), cmap='Greens', missing_kwds={'color': 'grey', "hatch": "///"})
ax.set_title("La taux de personnes indemnes par personne impliquées des accidents par département")

#sort sf by nb_morts_normalise
sf = sf.sort_values(by=['nb_morts_normalise'], ascending=False)
sf.head(10)
                                                                                
Out[ ]:
code nom geometry nb_morts nm_implique nb_morts_normalise
67 94 Val-de-Marne POLYGON ((2.33190 48.81701, 2.36395 48.81631, ... 11806 25550 0.462074
44 92 Hauts-de-Seine POLYGON ((2.29097 48.95097, 2.32697 48.94536, ... 5398 11690 0.461762
19 93 Seine-Saint-Denis POLYGON ((2.55306 49.00982, 2.56579 49.01240, ... 8012 17463 0.458799
36 75 Paris POLYGON ((2.31989 48.90046, 2.38515 48.90201, ... 16078 36585 0.439470
54 33 Gironde POLYGON ((-1.02574 45.57469, -0.92654 45.49613... 7069 16496 0.428528
81 46 Lot POLYGON ((1.44826 45.01931, 1.46198 45.01370, ... 642 1501 0.427715
89 91 Essonne POLYGON ((2.22655 48.77610, 2.23297 48.76619, ... 5240 12296 0.426155
33 64 Pyrénées-Atlantiques POLYGON ((-0.24283 43.58498, -0.23503 43.58336... 4870 11432 0.425997
34 69 Rhône POLYGON ((4.38807 46.21979, 4.38829 46.24796, ... 7632 18235 0.418536
90 95 Val-d'Oise POLYGON ((1.70436 49.23220, 1.72966 49.22920, ... 5624 13475 0.417365

Affichons le nombre d'accidents ayant eu lieu au Rond-Point des Champs-Élysées entre 2011 et 2018¶

In [ ]:
lat_min, lat_max, long_min, long_max  = 4886809, 4886971, 230856, 231103
rond_point_marceldassault = caracteristiques_rdd.filter(lambda x: x.lat != None and x.long != None).filter(lambda x: lat_min <= x.lat <= lat_max and long_min <= x.long <= long_max).count() #175k accidents sans coordonnées GPS

lat_min, lat_max, long_min, long_max  = 4887284, 4887300, 229537, 229819
rond_point_arctriomphe= caracteristiques_rdd.filter(lambda x: x.lat != None and x.long != None).filter(lambda x: lat_min <= x.lat <= lat_max and long_min <= x.long <= long_max).count() # 1 accident

print(f"Nombre d'accidents sur le rond point Marcel Dassault : {rond_point_marceldassault}")
print(f"Nombre d'accidents sur le rond point de l'Arc de Triomphe : {rond_point_arctriomphe}")
[Stage 910:==============>                                          (1 + 3) / 4]
Nombre d'accidents sur le rond point Marcel Dassault : 34
Nombre d'accidents sur le rond point de l'Arc de Triomphe : 1
                                                                                

Nombre d'accidents par type d'intersection¶

In [ ]:
mapping = {1: "Hors Intersection", 2: "Intersection en X", 3: "Intersection en T", 4: "Intersection en Y", 5: "Intersection à +4 branches", 6: "Giratoies", 7: "Places", 8: "Passage à niveau", 9: "Autres"}
k = caracteristiques_rdd.map(lambda x: (mapping.get(x.int, "Autres"), 1)).reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], ascending=False).collect()
labels = [x[0] for x in k]
sizes = [x[1] for x in k]

fig1, ax1 = plt.subplots()
ax1.pie(sizes, labels=labels, autopct='%1.1f%%', shadow=True, startangle=90)
ax1.axis('equal')
ax1.title.set_text("Répartition des accidents par type d'intersection")

plt.show()
                                                                                
In [ ]:
# on va maintenant rassembler les types d'intersection en 3 catégories : Intersection, Giratoire et Autres
# on commence par créer un dictionnaire pour faire le mapping
# puis on map pour remplacer les types d'intersection par les catégories
# ensuite on fait un reduceByKey pour compter le nombre d'accidents par catégorie

mapping = {2: "Carrefour", 3: "Intersection", 4: "Intersection", 5: "Intersection", 6: "Giratoies", 7: "Autres", 8: "Autres", 9: "Autres"}
k = caracteristiques_rdd.filter(lambda x: x.int != 1).map(lambda x: (mapping.get(x.int, "Autres"), 1)).reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], ascending=False).collect()
labels = [x[0] for x in k]
sizes = [x[1] for x in k]

fig1, ax1 = plt.subplots()
ax1.pie(sizes, labels=labels, autopct='%1.1f%%', shadow=True, startangle=90)
ax1.axis('equal')
ax1.title.set_text("Répartition des accidents par type d'intersection")

plt.show()
                                                                                

On observe qu'un accident sur deux intervient en rond-point ou en carrefour, surement dû à un refus de priorité

In [ ]:
sf = gpd.read_file('geojson/departements.geojson')

# on veut montrer le type d'intersection avec le plus d'accidents par département

# on commence par créer un dictionnaire pour faire le mapping
# puis on map pour remplacer les types d'intersection par les catégories
# ensuite on fait un reduceByKey pour compter le nombre d'accidents par catégorie

mappings = {2: "Carrefour", 3: "Intersection en T/Y", 4: "Intersection en T/Y", 5: "Intersection à plus de 4 branches", 6: "Giratoies", 7: "Place", 8: "Passage à niveau", 9: "Autres"}
#k = caracteristiques_rdd.filter(lambda x: x.int != 1 and x.int != None).map(lambda x: (x.dep, mapping.get(x.int, "Autres"), 1))

k = caracteristiques.dropna(subset=['int', 'dep']).filter(caracteristiques.int != 1).filter(caracteristiques.gps == "M").toPandas()
k['int'] = k["int"].map(mappings)

# divse dep par 10 sauf pour 201 et 202 qu'on remplace par 2A et 2B
k['dep'] = k['dep'].apply(lambda x: "2A" if x == 201 else "2B" if x == 202 else str(x//10).zfill(2))
k = k.groupby(['dep', 'int']).size().reset_index(name='counts')

#for each dep, keep the intersection type with the most accidents
k = k.groupby('dep').apply(lambda x: x[x.counts == x.counts.max()]).reset_index(drop=True)
#k = k.groupby('dep').apply(lambda x: x[x.counts == x.counts.max()]).reset_index(drop=True).drop(columns=['counts'])

#merge with sf
sf = sf.merge(k, left_on='code', right_on='dep', how='left')

#plot
ax = sf.plot(column='int', figsize=(20, 10), legend=True)
ax.set_title("Type d'intersection le plus dangereux par département")

ax.annotate('Ardèche', xy=(4.5, 44.5), xytext=(4.5, 44.5), fontsize=12, color='white')
Out[ ]:
Text(4.5, 44.5, 'Ardèche')
In [ ]:
# scatter plot la longitude et la latitude

fig, ax = plt.subplots(figsize=(20, 10))
k = caracteristiques_rdd.filter(lambda x: x.long != None and x.long <= 1.5*10e5).filter(lambda x: x.lat != None and 3*10e5 <= x.lat <= 5.3*10e5)
ax.scatter(k.map(lambda x: x.long).collect(), k.map(lambda x: x.lat).collect(), s=0.1)
ax.set_title("Répartition des accidents en France")
ax.set_xlabel("Longitude")
ax.set_ylabel("Latitude")
plt.show()
                                                                                
In [ ]:
# nombre d'accident suivant le motif de deplacement
usagers_rdd_dropdup = usagers.dropDuplicates(["Num_Acc"]).rdd

mappings = {1: "Domicile - travail", 2: "Domicile - ecole", 3:"Domicile - course", 4:"Deplacement pro", 5:"Loisir", 9:"Autre", 0:"Autre"}


k = usagers_rdd_dropdup.filter(lambda x: x.trajet != None).map(lambda x: (mappings.get(x.trajet) or x.trajet, 1)).reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], ascending=False).collect()

#plot a chart with the number of accident by type of movement
labels = [x[0] for x in k]
sizes = [x[1] for x in k]
fig1, ax1 = plt.subplots()
ax1.pie(sizes, labels=labels, autopct='%1.1f%%', shadow=True, startangle=90)
ax1.axis('equal')
ax1.title.set_text("Répartition des accidents par motif de déplacement")
plt.show()
                                                                                
In [ ]:
# nombre d'accident suivant le motif de deplacement
usagers_rdd_dropdup = usagers.dropDuplicates(["Num_Acc"]).rdd

# on ne garde que les trajet domicile - travail
k = usagers_rdd_dropdup.filter(lambda x: x.trajet != None and x.trajet == 1)

k = k.map(lambda x: (x.sexe, 1)).reduceByKey(lambda x, y: x + y).collect()
print(k)
[Stage 941:>                                                        (0 + 4) / 4]
[(1, 55598), (2, 23249)]
                                                                                
In [ ]:
# affichons le sexe des victimes les plus touché par departement
sf = gpd.read_file('geojson/departements.geojson')


usagers_implique = usagers.join(caracteristiques.filter(caracteristiques.gps == "M"), usagers.Num_Acc == caracteristiques.Num_Acc, 'inner').rdd

# remplaçon les code pour les departements
#usagers_implique = usagers_implique.map(lambda x: ((str(x[0]).zfill(2) if x.dep != 201 and x.dep != 202 else x.dep, 1)))*
# lambda x: (("2A" is x.dep == 201 else "2B" if x.dep == 202 else str(x[0]).zfill(2), 1))

# compter le nombre de mort homme et femme par departement
k_hommes_mort = usagers_implique.filter(lambda x: x.sexe == 1).filter(lambda x: x.grav == 2).map(lambda x: (x.dep, 1)).reduceByKey(lambda x, y: x + y)
k_hommes_mort = k_hommes_mort.map(lambda x: ("2A" if x[0] == 201 else "2B" if x[0] == 202 else x[0]//10, x[1])).map(lambda x: (str(x[0]).zfill(2) if isinstance(x[0], int) else x[0], x[1])).collectAsMap()

k_femmes_mort = usagers_implique.filter(lambda x: x.sexe == 2).filter(lambda x: x.grav == 2).map(lambda x: (x.dep, 1)).reduceByKey(lambda x, y: x + y)
k_femmes_mort =  k_femmes_mort.map(lambda x: ("2A" if x[0] == 201 else "2B" if x[0] == 202 else x[0]//10, x[1])).map(lambda x: (str(x[0]).zfill(2) if isinstance(x[0], int) else x[0], x[1])).collectAsMap()

# compte le nombre de homme et femme par departement
k_hommes_impliquee = usagers_implique.filter(lambda x: x.sexe == 1).map(lambda x: (x.dep, 1)).reduceByKey(lambda x, y: x + y)
k_hommes_impliquee = k_hommes_impliquee.map(lambda x: ("2A" if x[0] == 201 else "2B" if x[0] == 202 else x[0]//10, x[1])).map(lambda x: (str(x[0]).zfill(2) if isinstance(x[0], int) else x[0], x[1])).collectAsMap()

k_femmes_impliquee = usagers_implique.filter(lambda x: x.sexe == 2).map(lambda x: (x.dep, 1)).reduceByKey(lambda x, y: x + y)
k_femmes_impliquee =  k_femmes_impliquee.map(lambda x: ("2A" if x[0] == 201 else "2B" if x[0] == 202 else x[0]//10, x[1])).map(lambda x: (str(x[0]).zfill(2) if isinstance(x[0], int) else x[0], x[1])).collectAsMap()

# on ajoute les données au geojson
sf['hommes_mort'] = sf['code'].map(k_hommes_mort)
sf['femmes_mort'] = sf['code'].map(k_femmes_mort)
sf['hommes_implique'] = sf['code'].map(k_hommes_impliquee)
sf['femmes_implique'] = sf['code'].map(k_femmes_impliquee)

# on calcule le pourcentage de mort par departement
sf['pourcentage_mort_homme'] = sf['hommes_mort'] / sf['hommes_implique']
sf['pourcentage_mort_femme'] = sf['femmes_mort'] / sf['femmes_implique']

# on affiche pour chaque departement le sexe le plus touché
sf['sexe_mort'] = sf.apply(lambda x: "Homme" if x['pourcentage_mort_homme'] > x['pourcentage_mort_femme'] else "Femme", axis=1)


# on plot 3 graphique, les departements ou les hommes sont les plus touchés, les departements ou les femmes sont les plus touchés et les departements les plus touché suivant le sexe
fig, ax = plt.subplots(1, 3, figsize=(20, 10))

# on plot les taux de mortalité hommes par departement
sf.plot(column='pourcentage_mort_homme', ax=ax[0], legend=True, legend_kwds={'label': "Pourcentage de mort par département", 'orientation': "horizontal"})
ax[0].set_title("Pourcentage de mort par département pour les hommes")

# on plot les taux de mortalité femmes par departement
sf.plot(column='pourcentage_mort_femme', ax=ax[1], legend=True, legend_kwds={'label': "Pourcentage de mort par département", 'orientation': "horizontal"})
ax[1].set_title("Pourcentage de mort par département pour les femmes")

# on plot le sexe le plus touché par departement
sf.plot(column='sexe_mort', ax=ax[2], legend=True)
ax[2].set_title("Sexe le plus touché par département")

plt.show()
sf.head(10)
                                                                                
Out[ ]:
code nom geometry hommes_mort femmes_mort hommes_implique femmes_implique pourcentage_mort_homme pourcentage_mort_femme sexe_mort
0 02 Aisne POLYGON ((3.17270 50.01200, 3.18220 50.01234, ... 229 59 2964 1346 0.077260 0.043834 Homme
1 10 Aube POLYGON ((3.41479 48.39027, 3.42208 48.41334, ... 133 49 2324 1268 0.057229 0.038644 Homme
2 14 Calvados POLYGON ((-1.11962 49.35557, -1.11503 49.36240... 182 64 4331 2422 0.042023 0.026424 Homme
3 15 Cantal POLYGON ((2.50841 45.47850, 2.52444 45.48070, ... 69 22 966 441 0.071429 0.049887 Homme
4 28 Eure-et-Loir POLYGON ((0.81482 48.67016, 0.82767 48.68072, ... 217 65 3238 1557 0.067017 0.041747 Homme
5 35 Ille-et-Vilaine MULTIPOLYGON (((-2.00690 48.56611, -2.04621 48... 305 99 8286 4758 0.036809 0.020807 Homme
6 39 Jura POLYGON ((5.51854 47.30418, 5.52327 47.30548, ... 135 57 1558 702 0.086650 0.081197 Homme
7 40 Landes POLYGON ((-1.25389 44.46760, -1.19248 44.48121... 195 59 2234 1102 0.087287 0.053539 Homme
8 42 Loire POLYGON ((3.89954 46.27591, 3.90551 46.27160, ... 179 73 7254 4159 0.024676 0.017552 Homme
9 45 Loiret POLYGON ((1.99409 48.28658, 2.00724 48.28469, ... 259 74 4484 2350 0.057761 0.031489 Homme
In [ ]:
# accident
vehicules_rdd_dropdup = vehicules.dropDuplicates(["Num_Acc", "catv"]).drop().rdd

mappings = {1: "deux roues", 2: "deux roues", 4:"deux roues", 5:"deux roues", 6:"deux roues", 7: "voiture", 30:"deux roues", 31:"deux roues", 32:"deux roues", 33:"deux roues", 34:"deux roues", 18: "bus", 37: "bus", 38: "bus", 19:"tramway", 40:"tramway", 99: "piéton"}

# barplot du nombre d'accident par type de véhicule
k = vehicules_rdd_dropdup.filter(lambda x: x.catv != None).map(lambda x: (mappings.get(x.catv, "autres"), 1)).reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], ascending=False).filter(lambda x: x[0] != "autres").collect()

# barplot
labels = [x[0] for x in k]
sizes = [x[1] for x in k]

fig, ax = plt.subplots(figsize=(20, 10))
ax.bar(labels, sizes)
ax.set_title("Répartition des accidents par type de véhicule")
ax.set_xlabel("Type de véhicule")
ax.set_ylabel("Nombre d'accidents")
plt.show()
                                                                                
In [ ]:
#piechar taux d'accident mortel par type de véhicule

usagers_dropna = usagers.dropna(subset=["grav"])

#filter the data to keep only the accident with a death
usagers_dropna = usagers_dropna.filter(usagers_dropna.grav == 2)

#join the data with the vehicules data to keep only the vehicule type
usagers_dropna = usagers_dropna.join(vehicules, usagers_dropna.Num_Acc == vehicules.Num_Acc, "inner")


mappings = {1: "deux roues", 2: "deux roues", 4:"deux roues", 5:"deux roues", 6:"deux roues", 7: "voiture", 30:"deux roues", 31:"deux roues", 32:"deux roues", 33:"deux roues", 34:"deux roues", 18: "bus", 37: "bus", 38: "bus", 19:"tramway", 40:"tramway", 99: "piéton"}

# transform to rdd and keep only the vehicule type
usagers_mort_typev = usagers_dropna.rdd.filter(lambda x: x.catv != None).map(lambda x: (mappings.get(x.catv, "autres"), 1)).reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], ascending=False).filter(lambda x: x[0] != "autres").collect()

# barplot
labels = [x[0] for x in usagers_mort_typev]
sizes = [x[1] for x in usagers_mort_typev]

fig, ax = plt.subplots(figsize=(20, 10))
ax.bar(labels, sizes)
ax.set_title("Répartition des personnes mortes dans un accident par type de véhicule")
ax.set_xlabel("Type de véhicule")
ax.set_ylabel("Nombre d'accidents")
plt.show()
                                                                                
In [ ]:
# normalisation des taux de mort suivant le nombre d'accident par type de véhicule
# barplot

# divide item in usagers_mort_typev by item in k
k2 = [(x[0], 100* (x[1]/y[1])) for x, y in zip(usagers_mort_typev, k)]
labels = [x[0] for x in k2]
sizes = [x[1] for x in k2]

fig, ax = plt.subplots(figsize=(20, 10))
ax.bar(labels, sizes)
ax.set_title("Répartition des Taux de mortalité par type de véhicule")
ax.set_xlabel("Type de véhicule")
ax.set_ylabel("Taux de mortalité")
plt.show()
In [ ]:
# graphique repartition des accidents par age pour chaque année
# create a subplot with 2 rows and 4 columns
fig, ax = plt.subplots(2, 4, figsize=(20, 10))

# plot le top3 des années de naissance des conducteurs par année dans un barplot
for i in range(11, 19):
    #acc_per_year = usagers_rdd.filter(lambda x: str(x.Num_Acc)[:4] == str(i+2000)).groupBy(lambda x: x.Num_Acc).distinct().count()
    acc_per_year = caracteristiques_rdd.filter(lambda x: x.an == i).count()

    k = usagers_rdd.filter(lambda x: x.an_nais != None).filter(lambda x: str(x.Num_Acc)[:4] == str(i+2000)).map(lambda x: (i+2000 - x.an_nais, 1)).map(lambda x: ("25 ans et -" if x[0] < 25 else "25-30 ans" if 25 <= x[0] < 30 else "30-35 ans" if 30 <= x[0] < 35 else '35-40 ans' if 35 <= x[0] < 40 else '40-45 ans' if 40 <= x[0] < 45 else '45-50 ans' if 45 <= x[0] < 50 else "50 ans et +", 1)).reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[0]).collect()

    labels = [x[0] for x in k]
    sizes = [x[1] for x in k]
    ax[int((i-11)/4)][(i-11)%4].pie(sizes, labels=labels, autopct='%1.1f%%', shadow=True, startangle=90)
    ax[int((i-11)/4)][(i-11)%4].axis('equal')
    ax[int((i-11)/4)][(i-11)%4].set_title(f"{acc_per_year} accidents en {i+2000}")

fig.legend(labels, loc="upper right")
                                                                                
Out[ ]:
<matplotlib.legend.Legend at 0x7fb210a92550>