Implémentation de Spark de randomSplit()
La fonction de signature de randomSplit() comprend une liste de poids et une spécification de graine. La liste de poids est de spécifier le nombre de divisions et le pourcentage (approximatif) dans chacune et la graine est pour la reproductibilité. Le rapport est approximatif en raison de la nature de son calcul.
Par exemple, le code suivant de la figure 3 diviserait df en deux trames de données, train_df
étant 80% et test_df
étant 20% de la trame de données d’origine. En utilisant la même valeur pour la graine aléatoire, nous nous attendons à ce que les mêmes points de données soient dans la même division si nous devions réexécuter le script ou que Spark reconstruit en interne les divisions.
Sous le capot
Le processus suivant est répété pour générer chaque trame de données fractionnée: partitionnement, tri dans les partitions et échantillonnage de Bernoulli. Si le bloc de données d’origine n’est pas mis en cache, les données seront récupérées, re-partitionnées et triées pour chaque calcul de fractionnement. C’est la source d’anomalies potentielles. En résumé, randomSplit() équivaut à exécuter sample() pour chaque division, le pourcentage à échantillonner changeant avec la division en cours d’exécution. Cela est évident si vous examinez le code source de randomSplit() dans PySpark3. Ce blog provides fournit également plus d’informations et de visuels sur la façon dont randomSplit() est implémenté.
Passons en revue un exemple. La figure 4 est un diagramme de l’échantillon () pour chaque division, en commençant par la division 0.80.
Spark utilise l’échantillonnage de Bernoulli, qui peut être résumé comme générant des nombres aléatoires pour un élément (point de données) et l’acceptant dans une division si le nombre généré se situe dans une certaine plage, déterminée par le rapport de division. Pour une trame de données fractionnée de 0,8, la plage d’acceptation pour l’échantillonneur de cellules de Bernoulli serait de.
Le même processus d’échantillonnage est suivi pour la division de 0,20 dans la figure 5, avec seulement les limites d’acceptation changeant en .
Le bloc de données est à nouveau récupéré, partitionné et trié dans les partitions. Vous pouvez voir dans l’exemple que les partitions RDD sont idempotentes. Ce qui signifie que les points de données de chaque partition de la figure 4 restent dans la même partition de la figure 5. Par exemple, les points b et c sont dans la partition 1 dans les figures 4 et 5. De plus, la graine associée à chaque partition reste toujours constante et l’ordre dans les partitions est identique. Ces trois points sont fondamentaux à la fois pour sample() et randomSplit(). Veiller à ce que le même échantillon soit produit avec la même graine dans le premier, et garantir l’absence de doublons ou de points de données disparaissant dans le second.
Solutions Pour Éviter les incohérences
La résolution de ces problèmes consiste à s’assurer que les partitions RDD et l’ordre de tri sont idempotents. L’une des trois méthodes suivantes le garantit et peut être appliquée: 1) mise en cache du bloc de données avant les opérations 2) repartitionnement par une colonne ou un ensemble de colonnes, et 3) utilisation de fonctions d’agrégation⁵. Un exemple de chaque méthode est illustré à la figure 6.
La mise en cache de la trame de données d’origine entraîne la conservation du contenu de la partition en mémoire. Ainsi, au lieu de récupérer à nouveau les données, de les partitionner et de les trier, Spark poursuit les opérations en utilisant les données partitionnées en mémoire. Notez que cache() est un alias pour persist(pyspark.StorageLevel.memory_only)
qui peut ne pas être idéal si vous avez des limitations de mémoire. Au lieu de cela, vous pouvez envisager d’utiliser persist(pyspark.StorageLevel.memory_and_disk_only)
. S’il n’y a pas de mémoire ou d’espace disque disponible, Spark récupérera et partitionnera les données à partir de zéro, il peut donc être sage de surveiller cela à partir de l’interface utilisateur Web Spark. La mise en cache est la solution que j’ai choisie dans mon cas.
Résumé et points à retenir
La morale de l’histoire est la suivante: si un comportement inattendu se produit dans Spark, il vous suffit de creuser un peu plus profondément! Voici un résumé de tous les points clés de cet article:
- randomSplit() équivaut à appliquer sample() sur votre bloc de données plusieurs fois, chaque échantillon récupérant, partitionnant et triant votre bloc de données dans des partitions.
- La distribution des données entre les partitions et l’ordre de tri est importante pour randomSplit() et sample(). Si l’un ou l’autre change lors de la récupération des données, il peut y avoir des doublons ou des valeurs manquantes entre les divisions et le même échantillon utilisant la même graine peut produire des résultats différents.
- Ces incohérences peuvent ne pas se produire à chaque exécution, mais pour les éliminer complètement, persistez (c’est-à-dire mettez en cache) votre bloc de données, répartissez-le sur une ou plusieurs colonnes ou appliquez des fonctions d’agrégat telles que groupBy.