Apache Spark™ est un moteur puissant permettant de réaliser du traitement de données à grande échelle et en temps réel.
Il propose des API en Java, Scala, Python et R, peut fonctionner avec Hadoop, Mesos ou en standalone.
Il peut accéder à plusieurs sources de données telles que HDFS, Cassandra, HBase et S3.
Intéressons-nous au concours proposé la MAIF sur datasciences.net : Décodage d’une formule de pricing.
Nous traiterons le problème en Scala, directement dans le shell Spark.
Nous n’aborderons pas la mise en place de la solution Spark mais proposons une image Docker permettant de lancer facilement un cluster.
Les extraits de code nécessitent des imports que nous ne présentons pas.
Commençons par définir la structure du jeu de données. Pour chaque colonne, nous donnons le nom du champ, le type de données ainsi que si le champ peut être nul :
val customSchema = StructType(Array(
StructField("id", IntegerType, true),
StructField("annee_naissance", IntegerType, true),
StructField("annee_permis", IntegerType, true),
StructField("marque", StringType, true),
StructField("puis_fiscale", IntegerType, true),
StructField("anc_veh", IntegerType, true),
StructField("codepostal", StringType, true),
StructField("energie_veh", StringType, true),
StructField("kmage_annuel", IntegerType, true),
StructField("crm", IntegerType, true),
StructField("profession", StringType, true),
StructField("var1", IntegerType, true),
...
StructField("prime_tot_ttc", DoubleType, true)))
Nous pouvons ensuite importer les données selon ce schéma. Pour utiliser ces commandes, il est nécessaire d’exécuter Spark avec l’option --packages com.databricks:spark-csv_2.10:1.3.0
.
var df = sqlContext.read.format("com.databricks.spark.csv").
option("header", "true").option("delimiter", ";").schema(customSchema).
load("/path/to/data/ech_apprentissage.csv")
var test = sqlContext.read.format("com.databricks.spark.csv").
option("header", "true").option("delimiter", ";").schema(customSchema).
load("/path/to/data/ech_test.csv")
Nous complétons ensuite la colonne à prédire sur l’échantillon de test ainsi que les valeurs non complétées (na).
test = test.drop("prime_tot_ttc").withColumn("prime_tot_ttc",expr("-999"))
df = df.na.fill(-999).na.fill("EMPTY")
test = test.na.fill(-999).na.fill("EMPTY")
Nous supprimons les données que nous ne jugeons pas nécessaires à notre étude :
df = df.drop("anc_veh").drop("codepostal")
test = test.drop("anc_veh").drop("codepostal")
Les algorithmes n’aiment pas trop les variables de type chaîne de caractères.Nous créons une liste contenant les noms des champs contenant des chaînes de caractères afin de pouvoir les traiter à part.
var dfString = df
val exprs = dfString.dtypes.filter(_._2 != "StringType").map(ct => ct._1).toList
exprs.foreach(cname => dfString = dfString.drop(cname))
Puis nous indexons toutes ces variables afin de les remplacer par des nombres (qui pourront être traitées par les algorithmes).
val stringIndexer: Array[org.apache.spark.ml.PipelineStage] = dfString.columns.map(cname => new StringIndexer().setInputCol(s"${cname}").setOutputCol(s"${cname}_index"))
var stages: Array[org.apache.spark.ml.PipelineStage] = stringIndexer
val pipelinePrepare = new Pipeline().setStages(stages)
val all = df.unionAll(test)
val preparation = pipelinePrepare.fit(all)
Nous ajoutons de nouvelles colonnes contenant les index.
df = preparation.transform(df)
test = preparation.transform(test)
Nous supprimons ensuite les champs chaînes de caractères.
val exprs = df.dtypes.filter(_._2 == "StringType").map(ct => ct._1).toList
exprs.foreach(cname => df = df.drop(cname))
val exprs = test.dtypes.filter(_._2 == "StringType").map(ct => ct._1).toList
exprs.foreach(cname => test = test.drop(cname))