Spark unter der Haube: randomSplit() und sample() Innenleben

Spark-Implementierung von randomSplit()

Die Signaturfunktion von randomSplit() enthält eine Gewichtungsliste und eine Seed-Spezifikation. Die Gewichtsliste soll die Anzahl der Splits und den Prozentsatz (ungefähr) in jedem angeben, und der Samen dient der Reproduzierbarkeit. Das Verhältnis ist aufgrund der Art seiner Berechnung ungefähr.

Beispielsweise würde der folgende Code in Abbildung 3 df in zwei Datenrahmen aufteilen, wobei train_df 80% und test_df 20% des ursprünglichen Datenrahmens beträgt. Wenn wir denselben Wert für random Seed verwenden, erwarten wir, dass sich dieselben Datenpunkte im selben Split befinden, wenn wir das Skript erneut ausführen oder Spark die Splits intern neu erstellt.

Abbildung 3: randomSplit() Signaturfunktionsbeispiel

Unter der Haube

Der folgende Vorgang wird wiederholt, um jeden geteilten Datenrahmen zu generieren: Partitionierung, Sortierung innerhalb von Partitionen und Bernoulli-Abtastung. Wenn der ursprüngliche Datenrahmen nicht zwischengespeichert wird, werden die Daten für jede geteilte Berechnung erneut abgerufen, neu partitioniert und neu sortiert. Dies ist die Quelle potenzieller Anomalien. Zusammenfassend ist randomSplit() äquivalent zum Ausführen von sample() für jeden Split, wobei sich der Prozentsatz zum Sample mit dem durchgeführten Split ändert. Dies wird deutlich, wenn Sie den Quellcode für randomSplit() in PySpark3 untersuchen. Dieser Blog⁴ bietet auch einige weitere Informationen und Grafiken zur Implementierung von randomSplit().

Lassen Sie uns ein Beispiel durchgehen. Abbildung 4 zeigt ein Diagramm der Stichprobe () für jeden Split, beginnend mit dem 0,80-Split.

Abbildung 4: Prozess der Erzeugung des 0.8 Split. Identisch mit sample() Implementierung.

Spark verwendet Bernoulli-Sampling, das so zusammengefasst werden kann, dass Zufallszahlen für ein Element (Datenpunkt) generiert und in einen Split aufgenommen werden, wenn die generierte Zahl in einen bestimmten Bereich fällt, der durch das Split-Verhältnis bestimmt wird. Für einen 0,8-Split-Datenrahmen wäre der Akzeptanzbereich für den Bernoulli-Zell-Sampler .

Der gleiche Stichprobenprozess wird für den 0,20-Split in Abbildung 5 befolgt, wobei sich nur die Akzeptanzgrenzen ändern.

Abbildung 5: Prozess der Erzeugung des 0.2 Split. Identisch mit sample(). Umsetzung. Der Partitionsinhalt bleibt konstant und die Sortierreihenfolge bleibt erhalten, um eine gültige Aufteilung zu gewährleisten.

Der Datenrahmen wird erneut abgerufen, partitioniert und innerhalb von Partitionen sortiert. Sie können im Beispiel sehen, dass RDD-Partitionen idempotent sind. Was bedeutet, dass die Datenpunkte in jeder Partition in Abbildung 4, bleiben in der gleichen Partition in Abbildung 5. Beispielsweise befinden sich die Punkte b und c in Partition 1 in Abbildung 4 und 5. Darüber hinaus bleibt der jeder Partition zugeordnete Startwert immer konstant und die Reihenfolge innerhalb der Partitionen ist identisch. Alle drei Punkte sind grundlegend für sample() und randomSplit() . Sicherstellen, dass dieselbe Stichprobe mit demselben Saatgut im ersteren erzeugt wird, und Gewährleistung, dass in letzterem keine Duplikate oder verschwindenden Datenpunkte vorhanden sind.

Lösungen zur Vermeidung von Inkonsistenzen

Die Behebung dieser Probleme besteht darin, sicherzustellen, dass RDD-Partitionen und Sortierreihenfolge idempotent sind. Jede der folgenden drei Methoden stellt dies sicher und kann angewendet werden: 1) Zwischenspeichern des Datenrahmens vor Operationen 2) Neupartitionierung nach einer Spalte oder einer Reihe von Spalten und 3) Verwenden von Aggregatfunktionen⁵. Ein Beispiel für jede Methode ist in Abbildung 6 dargestellt.

Abbildung 6: Drei verschiedene Methoden, um Inkonsistenzen in randomSplit() und sample() zu vermeiden

Das Zwischenspeichern des ursprünglichen Datenrahmens führt dazu, dass Partitionsinhalte im Speicher gehalten werden. Anstatt Daten erneut abzurufen, zu partitionieren und zu sortieren, setzt Spark die Vorgänge mit den partitionierten Daten im Speicher fort. Beachten Sie, dass cache() ein Alias für persist(pyspark.StorageLevel.memory_only) was möglicherweise nicht ideal ist, wenn Sie Speicherbeschränkungen haben. Stattdessen können Sie persist(pyspark.StorageLevel.memory_and_disk_only) . Wenn kein Speicher oder Festplattenspeicher verfügbar ist, ruft Spark die Daten erneut ab und partitioniert sie von Grund auf neu. Caching ist die Lösung, die ich in meinem Fall gewählt habe.

Zusammenfassung und wichtige Erkenntnisse

Die Moral der Geschichte lautet: Wenn in Spark unerwartetes Verhalten auftritt, müssen Sie nur etwas tiefer graben! Hier ist eine Zusammenfassung aller wichtigen Punkte dieses Artikels:

  • randomSplit() entspricht der mehrfachen Anwendung von sample() auf Ihren Datenrahmen, wobei jedes Sample Ihren Datenrahmen innerhalb von Partitionen erneut abruft, partitioniert und sortiert.
  • Die Datenverteilung über Partitionen und Sortierreihenfolge ist sowohl für randomSplit() als auch für sample() wichtig. Wenn sich beide beim erneuten Abrufen der Daten ändern, kann es zu Duplikaten oder fehlenden Werten über die Teilungen hinweg kommen, und dieselbe Stichprobe mit demselben Startwert kann zu unterschiedlichen Ergebnissen führen.
  • Diese Inkonsistenzen treten möglicherweise nicht bei jedem Lauf auf, aber um sie vollständig zu beseitigen, können Sie Ihren Datenrahmen beibehalten (auch bekannt als Cache), eine Spalte (n) neu partitionieren oder Aggregatfunktionen wie groupBy anwenden.

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert.