from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[4]").getOrCreate()
Pas forcement utile dans notre cas étant donné que les datasets ne sont pas très grands.
L'utilisation de inferSchema=true
aurait suffit mais risque de prends plus de temps et n'est pas forcément fiable
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType
schema_caracteristiques = StructType([
StructField("Num_Acc", LongType()),
StructField("an", IntegerType()),
StructField("mois", IntegerType()),
StructField("jour", IntegerType()),
StructField("hrmn", IntegerType()),
StructField("lum", IntegerType()),
StructField("agg", IntegerType()),
StructField("int", IntegerType()),
StructField("atm", IntegerType()),
StructField("col", IntegerType()),
StructField("com", IntegerType()),
#StructField("adr", StringType()),
StructField("gps", StringType()),
StructField("lat", IntegerType()),
StructField("long", IntegerType()),
StructField("dep", IntegerType()),
])
schema_vehicules = StructType([
StructField("Num_Acc", LongType()),
StructField("senc", IntegerType()),
StructField("catv", IntegerType()),
StructField("occutc", IntegerType()),
StructField("obs", IntegerType()),
StructField("obsm", IntegerType()),
StructField("choc", IntegerType()),
StructField("manv", IntegerType()),
StructField("num_veh", StringType()),
])
schema_usagers = StructType([
StructField("Num_Acc", LongType()),
StructField("place", IntegerType()),
StructField("catu", IntegerType()),
StructField("grav", IntegerType()),
StructField("sexe", IntegerType()),
StructField("trajet", IntegerType()),
StructField("secu", StringType()),
StructField("locp", IntegerType()),
StructField("actp", IntegerType()),
StructField("etatp", IntegerType()),
StructField("an_nais", IntegerType()),
StructField("num_veh", StringType()),
])
schema_lieux = StructType([
StructField("Num_Acc", LongType()),
StructField("catr", IntegerType()),
StructField("voie", StringType()),
StructField("v1", IntegerType()),
StructField("v2", StringType()),
StructField("circ", IntegerType()),
StructField("nbv", IntegerType()),
StructField("pr", IntegerType()),
StructField("pr1", IntegerType()),
StructField("vosp", IntegerType()),
StructField("prof", IntegerType()),
StructField("plan", IntegerType()),
StructField("lartpc", IntegerType()),
StructField("larrout", IntegerType()),
StructField("surf", IntegerType()),
StructField("infra", IntegerType()),
StructField("situ", IntegerType()),
StructField("env1", IntegerType()),
])
# On supprime la colonne adr contenant des chaînes de caractères avec un encodage pouvant poser problème
caracteristiques = spark.read.options(header=True, delimiter=",").schema(schema_caracteristiques).csv("./accidents/carac.txt")
#caracteristiques = caracteristiques.drop('adr')
vehicules = spark.read.options(header=True, delimiter=",").schema(schema_vehicules).csv("./accidents/vehicules*.csv")
usagers = spark.read.options(header=True, delimiter=",").schema(schema_usagers).csv("./accidents/usagers*.csv")
lieux = spark.read.options(header=True, delimiter=",").schema(schema_lieux).csv("./accidents/lieux*.csv")
caracteristiques_rdd = caracteristiques.rdd
vehicules_rdd = vehicules.rdd
usagers_rdd = usagers.rdd
lieux_rdd = lieux.rdd
caracteristiques.show(truncate=False, n=5)
print(f"shape: {caracteristiques.count()} x {len(caracteristiques.columns)}")
+------------+---+----+----+----+---+---+---+---+---+---+---+-------+------+---+ |Num_Acc |an |mois|jour|hrmn|lum|agg|int|atm|col|com|gps|lat |long |dep| +------------+---+----+----+----+---+---+---+---+---+---+---+-------+------+---+ |201100000001|11 |1 |22 |1400|1 |2 |1 |1 |3 |5 |M |5053589|295262|590| |201100000002|11 |6 |24 |1500|1 |1 |1 |1 |3 |11 |M |5051652|293898|590| |201100000003|11 |9 |16 |645 |2 |2 |1 |1 |6 |52 |M |5051080|290322|590| |201100000004|11 |9 |22 |1515|1 |2 |1 |1 |5 |11 |M |5051861|293043|590| |201100000005|11 |10 |24 |1545|1 |2 |1 |1 |6 |11 |M |5052506|293541|590| +------------+---+----+----+----+---+---+---+---+---+---+---+-------+------+---+ only showing top 5 rows shape: 484045 x 15
vehicules.show(truncate=False, n=5)
+------------+----+----+------+---+----+----+----+-------+ |Num_Acc |senc|catv|occutc|obs|obsm|choc|manv|num_veh| +------------+----+----+------+---+----+----+----+-------+ |201100000001|0 |2 |0 |0 |2 |1 |17 |A01 | |201100000001|0 |7 |0 |0 |0 |6 |15 |B02 | |201100000002|0 |10 |0 |0 |2 |2 |10 |A01 | |201100000002|0 |33 |0 |0 |2 |1 |1 |B02 | |201100000003|0 |7 |0 |0 |1 |3 |1 |A01 | +------------+----+----+------+---+----+----+----+-------+ only showing top 5 rows
usagers.show(truncate=False, n=5)
print(f"shape: {usagers.count()} x {len(usagers.columns)}")
+------------+-----+----+----+----+------+----+----+----+-----+-------+-------+ |Num_Acc |place|catu|grav|sexe|trajet|secu|locp|actp|etatp|an_nais|num_veh| +------------+-----+----+----+----+------+----+----+----+-----+-------+-------+ |201100000001|1 |1 |3 |1 |5 |21 |0 |0 |0 |1995 |A01 | |201100000001|1 |1 |1 |1 |5 |11 |0 |0 |0 |1949 |B02 | |201100000002|1 |1 |1 |1 |0 |11 |0 |0 |0 |1967 |A01 | |201100000002|1 |1 |3 |1 |0 |21 |0 |0 |0 |1963 |B02 | |201100000003|1 |1 |1 |1 |1 |11 |0 |0 |0 |1989 |A01 | +------------+-----+----+----+----+------+----+----+----+-----+-------+-------+ only showing top 5 rows shape: 1078041 x 12
lieux.show(truncate=False, n=5)
+------------+----+----+----+----+----+---+---+---+----+----+----+------+-------+----+-----+----+----+ |Num_Acc |catr|voie|v1 |v2 |circ|nbv|pr |pr1|vosp|prof|plan|lartpc|larrout|surf|infra|situ|env1| +------------+----+----+----+----+----+---+---+---+----+----+----+------+-------+----+-----+----+----+ |201100000001|3 |39 |null|null|2 |2 |5 |535|0 |1 |1 |0 |60 |1 |0 |1 |0 | |201100000002|3 |41 |null|B |2 |0 |0 |700|0 |0 |0 |0 |0 |0 |0 |0 |0 | |201100000003|3 |39 |null|null|2 |2 |10 |600|0 |1 |1 |0 |60 |1 |0 |1 |99 | |201100000004|3 |39 |null|null|2 |0 |8 |400|0 |1 |1 |0 |58 |1 |0 |1 |0 | |201100000005|3 |39 |null|null|2 |2 |7 |450|0 |1 |1 |0 |0 |1 |0 |1 |3 | +------------+----+----+----+----+----+---+---+---+----+----+----+------+-------+----+-----+----+----+ only showing top 5 rows
import matplotlib.pyplot as plt
k = caracteristiques_rdd.map(lambda x: ("Agglo" if x.agg-1 else "Hors Agglo", 1)).reduceByKey(lambda x, y: x + y).collect()
labels = [x[0] for x in k]
sizes = [x[1] for x in k]
explode = (0, 0.1) # permet de faire ressortir la part "Autres"
fig1, ax1 = plt.subplots()
ax1.pie(sizes, explode=explode, labels=labels, autopct='%1.1f%%', shadow=True, startangle=90)
ax1.axis('equal')
ax1.title.set_text("Répartition des accidents survenus en et hors agglomération")
plt.show()
On observe qu'une grande majorité d'accidents à lieu en agglomération
# comparaison des taux de mortalité entre les accidents survenus en agglomération et hors agglomération
# on ne garde que les accidents mortels
usagers_morts = usagers_rdd.filter(lambda x: x.grav == 2)
# accidents mortels en agglomération
usagers_morts_horsagglo = usagers_morts.join(caracteristiques_rdd.filter(lambda x: x.agg == 1))
# accidents mortels en agglomération
usagers_morts_agglo = usagers_morts.join(caracteristiques_rdd.filter(lambda x: x.agg == 2))
labels = "Accidents mortels en agglomération", "Accidents mortels hors agglomération"
sizes = usagers_morts_agglo.count(), usagers_morts_horsagglo.count()
explode = (0, 0.1) # permet de faire ressortir la part
fig1, ax1 = plt.subplots()
ax1.pie(sizes, explode=explode, labels=labels, autopct='%1.1f%%', shadow=True, startangle=90)
ax1.axis('equal')
ax1.title.set_text("Répartition des accidents mortels survenus en et hors agglomération")
plt.show()
# on veut afficher le top 5 des departements avec le plus d'accidents
# on commence par map pour avoir un tuple (dep, 1)
# puis reduceByKey pour compter le nombre d'accidents par departement
# ensuite sortBy pour trier les departements par ordre décroissant
# et enfin un take pour afficher les 5 premiers
caracteristiques_rdd.map(lambda x: (x.dep, 1)).reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], ascending=False).take(5)
[(750, 51653), (130, 31405), (930, 22592), (920, 21152), (940, 20289)]
On observe que le top cinq représente essentiellement Paris, excepté pour Marseille qui se place en seconde position
# on commence par map pour avoir un tuple (paris, 1) ou (hors paris, 1)
# puis reduceByKey pour compter le nombre d'accidents par departement
# enfin collect pour afficher les résultats
k = caracteristiques_rdd.map(lambda x: ("Paris" if x.dep in [750, 940, 920, 930] else "Hors Paris", 1)).reduceByKey(lambda x, y: x + y).collect()
print(k)
labels = [x[0] for x in k]
sizes = [x[1] for x in k]
explode = (0, 0.1) # permet de faire ressortir la part "Autres"
fig1, ax1 = plt.subplots()
ax1.pie(sizes, explode=explode, labels=labels, autopct='%1.1f%%', shadow=True, startangle=90)
ax1.axis('equal')
ax1.title.set_text("Répartition des accidents Paris vs Hors Paris entre 2011 et 2018")
plt.show()
[Stage 864:==============> (1 + 3) / 4]
[('Hors Paris', 368359), ('Paris', 115686)]
On observe qu'environ 1 accidents sur 4 à lieu à Paris, verifions si c'est le cas tout les ans
# create a subplot with 2 rows and 4 columns
fig, ax = plt.subplots(2, 4, figsize=(20, 10))
# on plot un camembert du nombre d'accidents à Paris et hors Paris pour chaque année entre 2011 et 2018
for i in range(11, 19):
k = caracteristiques_rdd.filter(lambda x: x.an == i).map(lambda x: ("Paris" if x.dep in [750, 930, 940, 920] else "Hors Paris", 1)).reduceByKey(lambda x, y: x + y).collect()
labels = k[0][0], k[1][0]
sizes = k[0][1], k[1][1]
explode = (0, 0.1) # permet de faire ressortir la part "Autres"
ax[int((i-11)/4)][(i-11)%4].pie(sizes, explode=explode, labels=labels, autopct='%1.1f%%', shadow=True, startangle=90)
ax[int((i-11)/4)][(i-11)%4].axis('equal')
ax[int((i-11)/4)][(i-11)%4].set_title(f"Accidents en {i+2000}")
les fichier geojson ont été récupéré sur https://github.com/gregoiredavid/france-geojson
import geopandas as gpd
sf = gpd.read_file('geojson/departements.geojson')
# on va maintenant afficher la répartition des accidents sur la carte de France
# on commence par créer un dictionnaire qui associe à chaque département le nombre d'accidents
# puis on crée une nouvelle colonne dans le dataframe sf qui contient le nombre d'accidents pour chaque département
# enfin on affiche la carte
#remove tout les accidents qui ne sont pas en France
k = caracteristiques_rdd.filter(lambda x: x.gps == "M")
accidents_par_dep = k.map(lambda x: ("2A" if x.dep == 201 else "2B" if x.dep == 202 else x.dep//10, 1)).reduceByKey(lambda x, y: x + y).map(lambda x: (str(x[0]).zfill(2) if isinstance(x[0], int) else x[0], x[1])).collectAsMap()
sf['nb_accidents'] = sf['code'].map(accidents_par_dep)
ax = sf.plot(column='nb_accidents', legend=True, figsize=(20, 10), cmap='OrRd', missing_kwds={'color': 'grey', "hatch": "///"})
ax.set_title("Répartition des accidents sur la carte de France")
# annote paris bouche du rhone
ax.annotate('Paris', xy=(2.35, 48.85), xytext=(2.35, 48.85), color='black', fontsize=12)
ax.annotate('Bouches du Rhone', xy=(5.35, 43.3), xytext=(5.35, 43.3), color='black', fontsize=12)
Text(5.35, 43.3, 'Bouches du Rhone')
# on va maintenant afficher la répartition normalisé des accidents mortels sur la carte de France afin de visualisé les departements les plus dangereux
# on commence par créer un dictionnaire qui associe à chaque département le nombre d'accidents mortels
# puis on crée une nouvelle colonne dans le dataframe sf qui contient le nombre d'accidents mortels pour chaque département
# enfin on affiche la carte
sf = gpd.read_file('geojson/departements.geojson')
# filter usagers df to keep only dead people
usagers_mort = usagers.filter(usagers.grav == 2)
usagers_implique = usagers.join(caracteristiques.filter(caracteristiques.gps == "M"), usagers.Num_Acc == caracteristiques.Num_Acc, 'inner').rdd
usagers_mort_carac = usagers_mort.join(caracteristiques.filter(caracteristiques.gps == "M"), usagers_mort.Num_Acc == caracteristiques.Num_Acc, 'inner').rdd
mort_par_dep = usagers_mort_carac.map(lambda x: ("2A" if x.dep == 201 else "2B" if x.dep == 202 else x.dep//10, 1)).reduceByKey(lambda x, y: x + y).map(lambda x: (str(x[0]).zfill(2) if isinstance(x[0], int) else x[0], x[1])).collectAsMap()
implique_par_dep = usagers_implique.map(lambda x: ("2A" if x.dep == 201 else "2B" if x.dep == 202 else x.dep//10, 1)).reduceByKey(lambda x, y: x + y).map(lambda x: (str(x[0]).zfill(2) if isinstance(x[0], int) else x[0], x[1])).collectAsMap()
sf['nb_morts'] = sf['code'].map(mort_par_dep)
sf['nm_implique'] = sf['code'].map(implique_par_dep)
sf['nb_morts_normalise'] = sf['nb_morts']/sf['nm_implique']
ax = sf.plot(column='nb_morts_normalise', legend=True, figsize=(20, 10), cmap='OrRd', missing_kwds={'color': 'grey', "hatch": "///"})
ax.set_title("La taux de mortalité par personne impliquées des accidents par département")
#sort sf by nb_morts_normalise
sf = sf.sort_values(by=['nb_morts_normalise'], ascending=False)
sf.head(10)
code | nom | geometry | nb_morts | nm_implique | nb_morts_normalise | |
---|---|---|---|---|---|---|
38 | 81 | Tarn | POLYGON ((1.99017 44.14945, 2.02477 44.15513, ... | 215 | 2250 | 0.095556 |
6 | 39 | Jura | POLYGON ((5.51854 47.30418, 5.52327 47.30548, ... | 192 | 2260 | 0.084956 |
30 | 48 | Lozère | POLYGON ((3.36135 44.97141, 3.37032 44.96998, ... | 85 | 1055 | 0.080569 |
11 | 55 | Meuse | POLYGON ((4.95099 49.23687, 4.96403 49.24526, ... | 111 | 1379 | 0.080493 |
27 | 24 | Dordogne | POLYGON ((0.62974 45.71457, 0.64015 45.69790, ... | 287 | 3592 | 0.079900 |
65 | 84 | Vaucluse | MULTIPOLYGON (((4.88812 44.33169, 4.89533 44.3... | 311 | 3901 | 0.079723 |
39 | 82 | Tarn-et-Garonne | POLYGON ((1.06408 44.37851, 1.08240 44.38141, ... | 185 | 2351 | 0.078690 |
94 | 79 | Deux-Sèvres | POLYGON ((-0.89196 46.97582, -0.87973 46.97580... | 224 | 2870 | 0.078049 |
7 | 40 | Landes | POLYGON ((-1.25389 44.46760, -1.19248 44.48121... | 254 | 3336 | 0.076139 |
81 | 46 | Lot | POLYGON ((1.44826 45.01931, 1.46198 45.01370, ... | 112 | 1501 | 0.074617 |
# on va maintenant afficher la répartition normalisé des accidents mortels sur la carte de France afin de visualisé les departements les plus dangereux
# on commence par créer un dictionnaire qui associe à chaque département le nombre d'accidents mortels
# puis on crée une nouvelle colonne dans le dataframe sf qui contient le nombre d'accidents mortels pour chaque département
# enfin on affiche la carte
sf = gpd.read_file('geojson/departements.geojson')
# filter usagers df to keep only dead people
usagers_mort = usagers.filter(usagers.grav == 1)
usagers_implique = usagers.join(caracteristiques.filter(caracteristiques.gps == "M"), usagers.Num_Acc == caracteristiques.Num_Acc, 'inner').rdd
usagers_mort_carac = usagers_mort.join(caracteristiques.filter(caracteristiques.gps == "M"), usagers_mort.Num_Acc == caracteristiques.Num_Acc, 'inner').rdd
mort_par_dep = usagers_mort_carac.map(lambda x: ("2A" if x.dep == 201 else "2B" if x.dep == 202 else x.dep//10, 1)).reduceByKey(lambda x, y: x + y).map(lambda x: (str(x[0]).zfill(2) if isinstance(x[0], int) else x[0], x[1])).collectAsMap()
implique_par_dep = usagers_implique.map(lambda x: ("2A" if x.dep == 201 else "2B" if x.dep == 202 else x.dep//10, 1)).reduceByKey(lambda x, y: x + y).map(lambda x: (str(x[0]).zfill(2) if isinstance(x[0], int) else x[0], x[1])).collectAsMap()
sf['nb_morts'] = sf['code'].map(mort_par_dep)
sf['nm_implique'] = sf['code'].map(implique_par_dep)
sf['nb_morts_normalise'] = sf['nb_morts']/sf['nm_implique']
ax = sf.plot(column='nb_morts_normalise', legend=True, figsize=(20, 10), cmap='Greens', missing_kwds={'color': 'grey', "hatch": "///"})
ax.set_title("La taux de personnes indemnes par personne impliquées des accidents par département")
#sort sf by nb_morts_normalise
sf = sf.sort_values(by=['nb_morts_normalise'], ascending=False)
sf.head(10)
code | nom | geometry | nb_morts | nm_implique | nb_morts_normalise | |
---|---|---|---|---|---|---|
67 | 94 | Val-de-Marne | POLYGON ((2.33190 48.81701, 2.36395 48.81631, ... | 11806 | 25550 | 0.462074 |
44 | 92 | Hauts-de-Seine | POLYGON ((2.29097 48.95097, 2.32697 48.94536, ... | 5398 | 11690 | 0.461762 |
19 | 93 | Seine-Saint-Denis | POLYGON ((2.55306 49.00982, 2.56579 49.01240, ... | 8012 | 17463 | 0.458799 |
36 | 75 | Paris | POLYGON ((2.31989 48.90046, 2.38515 48.90201, ... | 16078 | 36585 | 0.439470 |
54 | 33 | Gironde | POLYGON ((-1.02574 45.57469, -0.92654 45.49613... | 7069 | 16496 | 0.428528 |
81 | 46 | Lot | POLYGON ((1.44826 45.01931, 1.46198 45.01370, ... | 642 | 1501 | 0.427715 |
89 | 91 | Essonne | POLYGON ((2.22655 48.77610, 2.23297 48.76619, ... | 5240 | 12296 | 0.426155 |
33 | 64 | Pyrénées-Atlantiques | POLYGON ((-0.24283 43.58498, -0.23503 43.58336... | 4870 | 11432 | 0.425997 |
34 | 69 | Rhône | POLYGON ((4.38807 46.21979, 4.38829 46.24796, ... | 7632 | 18235 | 0.418536 |
90 | 95 | Val-d'Oise | POLYGON ((1.70436 49.23220, 1.72966 49.22920, ... | 5624 | 13475 | 0.417365 |
lat_min, lat_max, long_min, long_max = 4886809, 4886971, 230856, 231103
rond_point_marceldassault = caracteristiques_rdd.filter(lambda x: x.lat != None and x.long != None).filter(lambda x: lat_min <= x.lat <= lat_max and long_min <= x.long <= long_max).count() #175k accidents sans coordonnées GPS
lat_min, lat_max, long_min, long_max = 4887284, 4887300, 229537, 229819
rond_point_arctriomphe= caracteristiques_rdd.filter(lambda x: x.lat != None and x.long != None).filter(lambda x: lat_min <= x.lat <= lat_max and long_min <= x.long <= long_max).count() # 1 accident
print(f"Nombre d'accidents sur le rond point Marcel Dassault : {rond_point_marceldassault}")
print(f"Nombre d'accidents sur le rond point de l'Arc de Triomphe : {rond_point_arctriomphe}")
[Stage 910:==============> (1 + 3) / 4]
Nombre d'accidents sur le rond point Marcel Dassault : 34 Nombre d'accidents sur le rond point de l'Arc de Triomphe : 1
mapping = {1: "Hors Intersection", 2: "Intersection en X", 3: "Intersection en T", 4: "Intersection en Y", 5: "Intersection à +4 branches", 6: "Giratoies", 7: "Places", 8: "Passage à niveau", 9: "Autres"}
k = caracteristiques_rdd.map(lambda x: (mapping.get(x.int, "Autres"), 1)).reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], ascending=False).collect()
labels = [x[0] for x in k]
sizes = [x[1] for x in k]
fig1, ax1 = plt.subplots()
ax1.pie(sizes, labels=labels, autopct='%1.1f%%', shadow=True, startangle=90)
ax1.axis('equal')
ax1.title.set_text("Répartition des accidents par type d'intersection")
plt.show()
# on va maintenant rassembler les types d'intersection en 3 catégories : Intersection, Giratoire et Autres
# on commence par créer un dictionnaire pour faire le mapping
# puis on map pour remplacer les types d'intersection par les catégories
# ensuite on fait un reduceByKey pour compter le nombre d'accidents par catégorie
mapping = {2: "Carrefour", 3: "Intersection", 4: "Intersection", 5: "Intersection", 6: "Giratoies", 7: "Autres", 8: "Autres", 9: "Autres"}
k = caracteristiques_rdd.filter(lambda x: x.int != 1).map(lambda x: (mapping.get(x.int, "Autres"), 1)).reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], ascending=False).collect()
labels = [x[0] for x in k]
sizes = [x[1] for x in k]
fig1, ax1 = plt.subplots()
ax1.pie(sizes, labels=labels, autopct='%1.1f%%', shadow=True, startangle=90)
ax1.axis('equal')
ax1.title.set_text("Répartition des accidents par type d'intersection")
plt.show()
On observe qu'un accident sur deux intervient en rond-point ou en carrefour, surement dû à un refus de priorité
sf = gpd.read_file('geojson/departements.geojson')
# on veut montrer le type d'intersection avec le plus d'accidents par département
# on commence par créer un dictionnaire pour faire le mapping
# puis on map pour remplacer les types d'intersection par les catégories
# ensuite on fait un reduceByKey pour compter le nombre d'accidents par catégorie
mappings = {2: "Carrefour", 3: "Intersection en T/Y", 4: "Intersection en T/Y", 5: "Intersection à plus de 4 branches", 6: "Giratoies", 7: "Place", 8: "Passage à niveau", 9: "Autres"}
#k = caracteristiques_rdd.filter(lambda x: x.int != 1 and x.int != None).map(lambda x: (x.dep, mapping.get(x.int, "Autres"), 1))
k = caracteristiques.dropna(subset=['int', 'dep']).filter(caracteristiques.int != 1).filter(caracteristiques.gps == "M").toPandas()
k['int'] = k["int"].map(mappings)
# divse dep par 10 sauf pour 201 et 202 qu'on remplace par 2A et 2B
k['dep'] = k['dep'].apply(lambda x: "2A" if x == 201 else "2B" if x == 202 else str(x//10).zfill(2))
k = k.groupby(['dep', 'int']).size().reset_index(name='counts')
#for each dep, keep the intersection type with the most accidents
k = k.groupby('dep').apply(lambda x: x[x.counts == x.counts.max()]).reset_index(drop=True)
#k = k.groupby('dep').apply(lambda x: x[x.counts == x.counts.max()]).reset_index(drop=True).drop(columns=['counts'])
#merge with sf
sf = sf.merge(k, left_on='code', right_on='dep', how='left')
#plot
ax = sf.plot(column='int', figsize=(20, 10), legend=True)
ax.set_title("Type d'intersection le plus dangereux par département")
ax.annotate('Ardèche', xy=(4.5, 44.5), xytext=(4.5, 44.5), fontsize=12, color='white')
Text(4.5, 44.5, 'Ardèche')
# scatter plot la longitude et la latitude
fig, ax = plt.subplots(figsize=(20, 10))
k = caracteristiques_rdd.filter(lambda x: x.long != None and x.long <= 1.5*10e5).filter(lambda x: x.lat != None and 3*10e5 <= x.lat <= 5.3*10e5)
ax.scatter(k.map(lambda x: x.long).collect(), k.map(lambda x: x.lat).collect(), s=0.1)
ax.set_title("Répartition des accidents en France")
ax.set_xlabel("Longitude")
ax.set_ylabel("Latitude")
plt.show()
# nombre d'accident suivant le motif de deplacement
usagers_rdd_dropdup = usagers.dropDuplicates(["Num_Acc"]).rdd
mappings = {1: "Domicile - travail", 2: "Domicile - ecole", 3:"Domicile - course", 4:"Deplacement pro", 5:"Loisir", 9:"Autre", 0:"Autre"}
k = usagers_rdd_dropdup.filter(lambda x: x.trajet != None).map(lambda x: (mappings.get(x.trajet) or x.trajet, 1)).reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], ascending=False).collect()
#plot a chart with the number of accident by type of movement
labels = [x[0] for x in k]
sizes = [x[1] for x in k]
fig1, ax1 = plt.subplots()
ax1.pie(sizes, labels=labels, autopct='%1.1f%%', shadow=True, startangle=90)
ax1.axis('equal')
ax1.title.set_text("Répartition des accidents par motif de déplacement")
plt.show()
# nombre d'accident suivant le motif de deplacement
usagers_rdd_dropdup = usagers.dropDuplicates(["Num_Acc"]).rdd
# on ne garde que les trajet domicile - travail
k = usagers_rdd_dropdup.filter(lambda x: x.trajet != None and x.trajet == 1)
k = k.map(lambda x: (x.sexe, 1)).reduceByKey(lambda x, y: x + y).collect()
print(k)
[Stage 941:> (0 + 4) / 4]
[(1, 55598), (2, 23249)]
# affichons le sexe des victimes les plus touché par departement
sf = gpd.read_file('geojson/departements.geojson')
usagers_implique = usagers.join(caracteristiques.filter(caracteristiques.gps == "M"), usagers.Num_Acc == caracteristiques.Num_Acc, 'inner').rdd
# remplaçon les code pour les departements
#usagers_implique = usagers_implique.map(lambda x: ((str(x[0]).zfill(2) if x.dep != 201 and x.dep != 202 else x.dep, 1)))*
# lambda x: (("2A" is x.dep == 201 else "2B" if x.dep == 202 else str(x[0]).zfill(2), 1))
# compter le nombre de mort homme et femme par departement
k_hommes_mort = usagers_implique.filter(lambda x: x.sexe == 1).filter(lambda x: x.grav == 2).map(lambda x: (x.dep, 1)).reduceByKey(lambda x, y: x + y)
k_hommes_mort = k_hommes_mort.map(lambda x: ("2A" if x[0] == 201 else "2B" if x[0] == 202 else x[0]//10, x[1])).map(lambda x: (str(x[0]).zfill(2) if isinstance(x[0], int) else x[0], x[1])).collectAsMap()
k_femmes_mort = usagers_implique.filter(lambda x: x.sexe == 2).filter(lambda x: x.grav == 2).map(lambda x: (x.dep, 1)).reduceByKey(lambda x, y: x + y)
k_femmes_mort = k_femmes_mort.map(lambda x: ("2A" if x[0] == 201 else "2B" if x[0] == 202 else x[0]//10, x[1])).map(lambda x: (str(x[0]).zfill(2) if isinstance(x[0], int) else x[0], x[1])).collectAsMap()
# compte le nombre de homme et femme par departement
k_hommes_impliquee = usagers_implique.filter(lambda x: x.sexe == 1).map(lambda x: (x.dep, 1)).reduceByKey(lambda x, y: x + y)
k_hommes_impliquee = k_hommes_impliquee.map(lambda x: ("2A" if x[0] == 201 else "2B" if x[0] == 202 else x[0]//10, x[1])).map(lambda x: (str(x[0]).zfill(2) if isinstance(x[0], int) else x[0], x[1])).collectAsMap()
k_femmes_impliquee = usagers_implique.filter(lambda x: x.sexe == 2).map(lambda x: (x.dep, 1)).reduceByKey(lambda x, y: x + y)
k_femmes_impliquee = k_femmes_impliquee.map(lambda x: ("2A" if x[0] == 201 else "2B" if x[0] == 202 else x[0]//10, x[1])).map(lambda x: (str(x[0]).zfill(2) if isinstance(x[0], int) else x[0], x[1])).collectAsMap()
# on ajoute les données au geojson
sf['hommes_mort'] = sf['code'].map(k_hommes_mort)
sf['femmes_mort'] = sf['code'].map(k_femmes_mort)
sf['hommes_implique'] = sf['code'].map(k_hommes_impliquee)
sf['femmes_implique'] = sf['code'].map(k_femmes_impliquee)
# on calcule le pourcentage de mort par departement
sf['pourcentage_mort_homme'] = sf['hommes_mort'] / sf['hommes_implique']
sf['pourcentage_mort_femme'] = sf['femmes_mort'] / sf['femmes_implique']
# on affiche pour chaque departement le sexe le plus touché
sf['sexe_mort'] = sf.apply(lambda x: "Homme" if x['pourcentage_mort_homme'] > x['pourcentage_mort_femme'] else "Femme", axis=1)
# on plot 3 graphique, les departements ou les hommes sont les plus touchés, les departements ou les femmes sont les plus touchés et les departements les plus touché suivant le sexe
fig, ax = plt.subplots(1, 3, figsize=(20, 10))
# on plot les taux de mortalité hommes par departement
sf.plot(column='pourcentage_mort_homme', ax=ax[0], legend=True, legend_kwds={'label': "Pourcentage de mort par département", 'orientation': "horizontal"})
ax[0].set_title("Pourcentage de mort par département pour les hommes")
# on plot les taux de mortalité femmes par departement
sf.plot(column='pourcentage_mort_femme', ax=ax[1], legend=True, legend_kwds={'label': "Pourcentage de mort par département", 'orientation': "horizontal"})
ax[1].set_title("Pourcentage de mort par département pour les femmes")
# on plot le sexe le plus touché par departement
sf.plot(column='sexe_mort', ax=ax[2], legend=True)
ax[2].set_title("Sexe le plus touché par département")
plt.show()
sf.head(10)
code | nom | geometry | hommes_mort | femmes_mort | hommes_implique | femmes_implique | pourcentage_mort_homme | pourcentage_mort_femme | sexe_mort | |
---|---|---|---|---|---|---|---|---|---|---|
0 | 02 | Aisne | POLYGON ((3.17270 50.01200, 3.18220 50.01234, ... | 229 | 59 | 2964 | 1346 | 0.077260 | 0.043834 | Homme |
1 | 10 | Aube | POLYGON ((3.41479 48.39027, 3.42208 48.41334, ... | 133 | 49 | 2324 | 1268 | 0.057229 | 0.038644 | Homme |
2 | 14 | Calvados | POLYGON ((-1.11962 49.35557, -1.11503 49.36240... | 182 | 64 | 4331 | 2422 | 0.042023 | 0.026424 | Homme |
3 | 15 | Cantal | POLYGON ((2.50841 45.47850, 2.52444 45.48070, ... | 69 | 22 | 966 | 441 | 0.071429 | 0.049887 | Homme |
4 | 28 | Eure-et-Loir | POLYGON ((0.81482 48.67016, 0.82767 48.68072, ... | 217 | 65 | 3238 | 1557 | 0.067017 | 0.041747 | Homme |
5 | 35 | Ille-et-Vilaine | MULTIPOLYGON (((-2.00690 48.56611, -2.04621 48... | 305 | 99 | 8286 | 4758 | 0.036809 | 0.020807 | Homme |
6 | 39 | Jura | POLYGON ((5.51854 47.30418, 5.52327 47.30548, ... | 135 | 57 | 1558 | 702 | 0.086650 | 0.081197 | Homme |
7 | 40 | Landes | POLYGON ((-1.25389 44.46760, -1.19248 44.48121... | 195 | 59 | 2234 | 1102 | 0.087287 | 0.053539 | Homme |
8 | 42 | Loire | POLYGON ((3.89954 46.27591, 3.90551 46.27160, ... | 179 | 73 | 7254 | 4159 | 0.024676 | 0.017552 | Homme |
9 | 45 | Loiret | POLYGON ((1.99409 48.28658, 2.00724 48.28469, ... | 259 | 74 | 4484 | 2350 | 0.057761 | 0.031489 | Homme |
# accident
vehicules_rdd_dropdup = vehicules.dropDuplicates(["Num_Acc", "catv"]).drop().rdd
mappings = {1: "deux roues", 2: "deux roues", 4:"deux roues", 5:"deux roues", 6:"deux roues", 7: "voiture", 30:"deux roues", 31:"deux roues", 32:"deux roues", 33:"deux roues", 34:"deux roues", 18: "bus", 37: "bus", 38: "bus", 19:"tramway", 40:"tramway", 99: "piéton"}
# barplot du nombre d'accident par type de véhicule
k = vehicules_rdd_dropdup.filter(lambda x: x.catv != None).map(lambda x: (mappings.get(x.catv, "autres"), 1)).reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], ascending=False).filter(lambda x: x[0] != "autres").collect()
# barplot
labels = [x[0] for x in k]
sizes = [x[1] for x in k]
fig, ax = plt.subplots(figsize=(20, 10))
ax.bar(labels, sizes)
ax.set_title("Répartition des accidents par type de véhicule")
ax.set_xlabel("Type de véhicule")
ax.set_ylabel("Nombre d'accidents")
plt.show()
#piechar taux d'accident mortel par type de véhicule
usagers_dropna = usagers.dropna(subset=["grav"])
#filter the data to keep only the accident with a death
usagers_dropna = usagers_dropna.filter(usagers_dropna.grav == 2)
#join the data with the vehicules data to keep only the vehicule type
usagers_dropna = usagers_dropna.join(vehicules, usagers_dropna.Num_Acc == vehicules.Num_Acc, "inner")
mappings = {1: "deux roues", 2: "deux roues", 4:"deux roues", 5:"deux roues", 6:"deux roues", 7: "voiture", 30:"deux roues", 31:"deux roues", 32:"deux roues", 33:"deux roues", 34:"deux roues", 18: "bus", 37: "bus", 38: "bus", 19:"tramway", 40:"tramway", 99: "piéton"}
# transform to rdd and keep only the vehicule type
usagers_mort_typev = usagers_dropna.rdd.filter(lambda x: x.catv != None).map(lambda x: (mappings.get(x.catv, "autres"), 1)).reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], ascending=False).filter(lambda x: x[0] != "autres").collect()
# barplot
labels = [x[0] for x in usagers_mort_typev]
sizes = [x[1] for x in usagers_mort_typev]
fig, ax = plt.subplots(figsize=(20, 10))
ax.bar(labels, sizes)
ax.set_title("Répartition des personnes mortes dans un accident par type de véhicule")
ax.set_xlabel("Type de véhicule")
ax.set_ylabel("Nombre d'accidents")
plt.show()
# normalisation des taux de mort suivant le nombre d'accident par type de véhicule
# barplot
# divide item in usagers_mort_typev by item in k
k2 = [(x[0], 100* (x[1]/y[1])) for x, y in zip(usagers_mort_typev, k)]
labels = [x[0] for x in k2]
sizes = [x[1] for x in k2]
fig, ax = plt.subplots(figsize=(20, 10))
ax.bar(labels, sizes)
ax.set_title("Répartition des Taux de mortalité par type de véhicule")
ax.set_xlabel("Type de véhicule")
ax.set_ylabel("Taux de mortalité")
plt.show()
# graphique repartition des accidents par age pour chaque année
# create a subplot with 2 rows and 4 columns
fig, ax = plt.subplots(2, 4, figsize=(20, 10))
# plot le top3 des années de naissance des conducteurs par année dans un barplot
for i in range(11, 19):
#acc_per_year = usagers_rdd.filter(lambda x: str(x.Num_Acc)[:4] == str(i+2000)).groupBy(lambda x: x.Num_Acc).distinct().count()
acc_per_year = caracteristiques_rdd.filter(lambda x: x.an == i).count()
k = usagers_rdd.filter(lambda x: x.an_nais != None).filter(lambda x: str(x.Num_Acc)[:4] == str(i+2000)).map(lambda x: (i+2000 - x.an_nais, 1)).map(lambda x: ("25 ans et -" if x[0] < 25 else "25-30 ans" if 25 <= x[0] < 30 else "30-35 ans" if 30 <= x[0] < 35 else '35-40 ans' if 35 <= x[0] < 40 else '40-45 ans' if 40 <= x[0] < 45 else '45-50 ans' if 45 <= x[0] < 50 else "50 ans et +", 1)).reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[0]).collect()
labels = [x[0] for x in k]
sizes = [x[1] for x in k]
ax[int((i-11)/4)][(i-11)%4].pie(sizes, labels=labels, autopct='%1.1f%%', shadow=True, startangle=90)
ax[int((i-11)/4)][(i-11)%4].axis('equal')
ax[int((i-11)/4)][(i-11)%4].set_title(f"{acc_per_year} accidents en {i+2000}")
fig.legend(labels, loc="upper right")
<matplotlib.legend.Legend at 0x7fb210a92550>